risingwave_rpc_client/
meta_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{
33    AlterDatabaseParam, FunctionId, IndexId, ObjectId, SecretId, TableId,
34};
35use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
36use risingwave_common::hash::WorkerSlotMapping;
37use risingwave_common::monitor::EndpointExt;
38use risingwave_common::system_param::reader::SystemParamsReader;
39use risingwave_common::telemetry::report::TelemetryInfoFetcher;
40use risingwave_common::util::addr::HostAddr;
41use risingwave_common::util::meta_addr::MetaAddressStrategy;
42use risingwave_common::util::resource_util::cpu::total_cpu_available;
43use risingwave_common::util::resource_util::hostname;
44use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
45use risingwave_error::bail;
46use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
47use risingwave_hummock_sdk::compaction_group::StateTableId;
48use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
49use risingwave_hummock_sdk::{
50    CompactionGroupId, HummockEpoch, HummockVersionId, ObjectIdRange, SyncResult,
51};
52use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
53use risingwave_pb::backup_service::*;
54use risingwave_pb::catalog::{
55    Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
56    PbSubscription, PbTable, PbView, Table,
57};
58use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
59use risingwave_pb::cloud_service::*;
60use risingwave_pb::common::worker_node::Property;
61use risingwave_pb::common::{HostAddress, OptionalUint32, OptionalUint64, WorkerNode, WorkerType};
62use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
63use risingwave_pb::ddl_service::alter_owner_request::Object;
64use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
65use risingwave_pb::ddl_service::create_materialized_view_request::PbBackfillType;
66use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
67use risingwave_pb::ddl_service::drop_table_request::SourceId;
68use risingwave_pb::ddl_service::*;
69use risingwave_pb::hummock::compact_task::TaskStatus;
70use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
71use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
72use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
73use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
74use risingwave_pb::hummock::write_limits::WriteLimit;
75use risingwave_pb::hummock::*;
76use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Register as IcebergRegister;
77use risingwave_pb::iceberg_compaction::{
78    SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
79    subscribe_iceberg_compaction_event_request,
80};
81use risingwave_pb::meta::alter_connector_props_request::{
82    AlterConnectorPropsObject, AlterIcebergTableIds, ExtraOptions,
83};
84use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
85use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
86use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
87use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
88use risingwave_pb::meta::hosted_iceberg_catalog_service_client::HostedIcebergCatalogServiceClient;
89use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
90use risingwave_pb::meta::list_actor_states_response::ActorState;
91use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
92use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
93use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
94use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
95use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
96use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
97use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
98use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
99use risingwave_pb::meta::serving_service_client::ServingServiceClient;
100use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
101use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
102use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
103use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
104use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
105use risingwave_pb::meta::{FragmentDistribution, *};
106use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
107use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
108use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
109use risingwave_pb::secret::PbSecretRef;
110use risingwave_pb::stream_plan::StreamFragmentGraph;
111use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
112use risingwave_pb::user::update_user_request::UpdateField;
113use risingwave_pb::user::user_service_client::UserServiceClient;
114use risingwave_pb::user::*;
115use thiserror_ext::AsReport;
116use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
117use tokio::sync::oneshot::Sender;
118use tokio::sync::{RwLock, mpsc, oneshot};
119use tokio::task::JoinHandle;
120use tokio::time::{self};
121use tokio_retry::strategy::{ExponentialBackoff, jitter};
122use tokio_stream::wrappers::UnboundedReceiverStream;
123use tonic::transport::Endpoint;
124use tonic::{Code, Request, Streaming};
125
126use crate::channel::{Channel, WrappedChannelExt};
127use crate::error::{Result, RpcError};
128use crate::hummock_meta_client::{
129    CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
130    IcebergCompactionEventItem,
131};
132use crate::meta_rpc_client_method_impl;
133
134type ConnectionId = u32;
135type DatabaseId = u32;
136type SchemaId = u32;
137
138/// Client to meta server. Cloning the instance is lightweight.
139#[derive(Clone, Debug)]
140pub struct MetaClient {
141    worker_id: u32,
142    worker_type: WorkerType,
143    host_addr: HostAddr,
144    inner: GrpcMetaClient,
145    meta_config: Arc<MetaConfig>,
146    cluster_id: String,
147    shutting_down: Arc<AtomicBool>,
148}
149
150impl MetaClient {
151    pub fn worker_id(&self) -> u32 {
152        self.worker_id
153    }
154
155    pub fn host_addr(&self) -> &HostAddr {
156        &self.host_addr
157    }
158
159    pub fn worker_type(&self) -> WorkerType {
160        self.worker_type
161    }
162
163    pub fn cluster_id(&self) -> &str {
164        &self.cluster_id
165    }
166
167    /// Subscribe to notification from meta.
168    pub async fn subscribe(
169        &self,
170        subscribe_type: SubscribeType,
171    ) -> Result<Streaming<SubscribeResponse>> {
172        let request = SubscribeRequest {
173            subscribe_type: subscribe_type as i32,
174            host: Some(self.host_addr.to_protobuf()),
175            worker_id: self.worker_id(),
176        };
177
178        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
179            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
180            true,
181        );
182
183        tokio_retry::Retry::spawn(retry_strategy, || async {
184            let request = request.clone();
185            self.inner.subscribe(request).await
186        })
187        .await
188    }
189
190    pub async fn create_connection(
191        &self,
192        connection_name: String,
193        database_id: u32,
194        schema_id: u32,
195        owner_id: u32,
196        req: create_connection_request::Payload,
197    ) -> Result<WaitVersion> {
198        let request = CreateConnectionRequest {
199            name: connection_name,
200            database_id,
201            schema_id,
202            owner_id,
203            payload: Some(req),
204        };
205        let resp = self.inner.create_connection(request).await?;
206        Ok(resp
207            .version
208            .ok_or_else(|| anyhow!("wait version not set"))?)
209    }
210
211    pub async fn create_secret(
212        &self,
213        secret_name: String,
214        database_id: u32,
215        schema_id: u32,
216        owner_id: u32,
217        value: Vec<u8>,
218    ) -> Result<WaitVersion> {
219        let request = CreateSecretRequest {
220            name: secret_name,
221            database_id,
222            schema_id,
223            owner_id,
224            value,
225        };
226        let resp = self.inner.create_secret(request).await?;
227        Ok(resp
228            .version
229            .ok_or_else(|| anyhow!("wait version not set"))?)
230    }
231
232    pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
233        let request = ListConnectionsRequest {};
234        let resp = self.inner.list_connections(request).await?;
235        Ok(resp.connections)
236    }
237
238    pub async fn drop_connection(
239        &self,
240        connection_id: ConnectionId,
241        cascade: bool,
242    ) -> Result<WaitVersion> {
243        let request = DropConnectionRequest {
244            connection_id,
245            cascade,
246        };
247        let resp = self.inner.drop_connection(request).await?;
248        Ok(resp
249            .version
250            .ok_or_else(|| anyhow!("wait version not set"))?)
251    }
252
253    pub async fn drop_secret(&self, secret_id: SecretId) -> Result<WaitVersion> {
254        let request = DropSecretRequest {
255            secret_id: secret_id.into(),
256        };
257        let resp = self.inner.drop_secret(request).await?;
258        Ok(resp
259            .version
260            .ok_or_else(|| anyhow!("wait version not set"))?)
261    }
262
263    /// Register the current node to the cluster and set the corresponding worker id.
264    ///
265    /// Retry if there's connection issue with the meta node. Exit the process if the registration fails.
266    pub async fn register_new(
267        addr_strategy: MetaAddressStrategy,
268        worker_type: WorkerType,
269        addr: &HostAddr,
270        property: Property,
271        meta_config: Arc<MetaConfig>,
272    ) -> (Self, SystemParamsReader) {
273        let ret =
274            Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
275
276        match ret {
277            Ok(ret) => ret,
278            Err(err) => {
279                tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
280                std::process::exit(1);
281            }
282        }
283    }
284
285    async fn register_new_inner(
286        addr_strategy: MetaAddressStrategy,
287        worker_type: WorkerType,
288        addr: &HostAddr,
289        property: Property,
290        meta_config: Arc<MetaConfig>,
291    ) -> Result<(Self, SystemParamsReader)> {
292        tracing::info!("register meta client using strategy: {}", addr_strategy);
293
294        // Retry until reaching `max_heartbeat_interval_secs`
295        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
296            Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
297            true,
298        );
299
300        if property.is_unschedulable {
301            tracing::warn!("worker {:?} registered as unschedulable", addr.clone());
302        }
303        let init_result: Result<_> = tokio_retry::RetryIf::spawn(
304            retry_strategy,
305            || async {
306                let grpc_meta_client =
307                    GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
308
309                let add_worker_resp = grpc_meta_client
310                    .add_worker_node(AddWorkerNodeRequest {
311                        worker_type: worker_type as i32,
312                        host: Some(addr.to_protobuf()),
313                        property: Some(property.clone()),
314                        resource: Some(risingwave_pb::common::worker_node::Resource {
315                            rw_version: RW_VERSION.to_owned(),
316                            total_memory_bytes: system_memory_available_bytes() as _,
317                            total_cpu_cores: total_cpu_available() as _,
318                            hostname: hostname(),
319                        }),
320                    })
321                    .await
322                    .context("failed to add worker node")?;
323
324                let system_params_resp = grpc_meta_client
325                    .get_system_params(GetSystemParamsRequest {})
326                    .await
327                    .context("failed to get initial system params")?;
328
329                Ok((add_worker_resp, system_params_resp, grpc_meta_client))
330            },
331            // Only retry if there's any transient connection issue.
332            // If the error is from our implementation or business, do not retry it.
333            |e: &RpcError| !e.is_from_tonic_server_impl(),
334        )
335        .await;
336
337        let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
338        let worker_id = add_worker_resp
339            .node_id
340            .expect("AddWorkerNodeResponse::node_id is empty");
341
342        let meta_client = Self {
343            worker_id,
344            worker_type,
345            host_addr: addr.clone(),
346            inner: grpc_meta_client,
347            meta_config: meta_config.clone(),
348            cluster_id: add_worker_resp.cluster_id,
349            shutting_down: Arc::new(false.into()),
350        };
351
352        static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
353        REPORT_PANIC.call_once(|| {
354            let meta_client_clone = meta_client.clone();
355            std::panic::update_hook(move |default_hook, info| {
356                // Try to report panic event to meta node.
357                meta_client_clone.try_add_panic_event_blocking(info, None);
358                default_hook(info);
359            });
360        });
361
362        Ok((meta_client, system_params_resp.params.unwrap().into()))
363    }
364
365    /// Activate the current node in cluster to confirm it's ready to serve.
366    pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
367        let request = ActivateWorkerNodeRequest {
368            host: Some(addr.to_protobuf()),
369            node_id: self.worker_id,
370        };
371        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
372            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
373            true,
374        );
375        tokio_retry::Retry::spawn(retry_strategy, || async {
376            let request = request.clone();
377            self.inner.activate_worker_node(request).await
378        })
379        .await?;
380
381        Ok(())
382    }
383
384    /// Send heartbeat signal to meta service.
385    pub async fn send_heartbeat(&self, node_id: u32) -> Result<()> {
386        let request = HeartbeatRequest { node_id };
387        let resp = self.inner.heartbeat(request).await?;
388        if let Some(status) = resp.status
389            && status.code() == risingwave_pb::common::status::Code::UnknownWorker
390        {
391            // Ignore the error if we're already shutting down.
392            // Otherwise, exit the process.
393            if !self.shutting_down.load(Relaxed) {
394                tracing::error!(message = status.message, "worker expired");
395                std::process::exit(1);
396            }
397        }
398        Ok(())
399    }
400
401    pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
402        let request = CreateDatabaseRequest { db: Some(db) };
403        let resp = self.inner.create_database(request).await?;
404        // TODO: handle error in `resp.status` here
405        Ok(resp
406            .version
407            .ok_or_else(|| anyhow!("wait version not set"))?)
408    }
409
410    pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
411        let request = CreateSchemaRequest {
412            schema: Some(schema),
413        };
414        let resp = self.inner.create_schema(request).await?;
415        // TODO: handle error in `resp.status` here
416        Ok(resp
417            .version
418            .ok_or_else(|| anyhow!("wait version not set"))?)
419    }
420
421    pub async fn create_materialized_view(
422        &self,
423        table: PbTable,
424        graph: StreamFragmentGraph,
425        dependencies: HashSet<ObjectId>,
426        specific_resource_group: Option<String>,
427        if_not_exists: bool,
428    ) -> Result<WaitVersion> {
429        let request = CreateMaterializedViewRequest {
430            materialized_view: Some(table),
431            fragment_graph: Some(graph),
432            backfill: PbBackfillType::Regular as _,
433            dependencies: dependencies.into_iter().collect(),
434            specific_resource_group,
435            if_not_exists,
436        };
437        let resp = self.inner.create_materialized_view(request).await?;
438        // TODO: handle error in `resp.status` here
439        Ok(resp
440            .version
441            .ok_or_else(|| anyhow!("wait version not set"))?)
442    }
443
444    pub async fn drop_materialized_view(
445        &self,
446        table_id: TableId,
447        cascade: bool,
448    ) -> Result<WaitVersion> {
449        let request = DropMaterializedViewRequest {
450            table_id: table_id.table_id(),
451            cascade,
452        };
453
454        let resp = self.inner.drop_materialized_view(request).await?;
455        Ok(resp
456            .version
457            .ok_or_else(|| anyhow!("wait version not set"))?)
458    }
459
460    pub async fn create_source(
461        &self,
462        source: PbSource,
463        graph: Option<StreamFragmentGraph>,
464        if_not_exists: bool,
465    ) -> Result<WaitVersion> {
466        let request = CreateSourceRequest {
467            source: Some(source),
468            fragment_graph: graph,
469            if_not_exists,
470        };
471
472        let resp = self.inner.create_source(request).await?;
473        Ok(resp
474            .version
475            .ok_or_else(|| anyhow!("wait version not set"))?)
476    }
477
478    pub async fn create_sink(
479        &self,
480        sink: PbSink,
481        graph: StreamFragmentGraph,
482        dependencies: HashSet<ObjectId>,
483        if_not_exists: bool,
484    ) -> Result<WaitVersion> {
485        let request = CreateSinkRequest {
486            sink: Some(sink),
487            fragment_graph: Some(graph),
488            dependencies: dependencies.into_iter().collect(),
489            if_not_exists,
490        };
491
492        let resp = self.inner.create_sink(request).await?;
493        Ok(resp
494            .version
495            .ok_or_else(|| anyhow!("wait version not set"))?)
496    }
497
498    pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
499        let request = CreateSubscriptionRequest {
500            subscription: Some(subscription),
501        };
502
503        let resp = self.inner.create_subscription(request).await?;
504        Ok(resp
505            .version
506            .ok_or_else(|| anyhow!("wait version not set"))?)
507    }
508
509    pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
510        let request = CreateFunctionRequest {
511            function: Some(function),
512        };
513        let resp = self.inner.create_function(request).await?;
514        Ok(resp
515            .version
516            .ok_or_else(|| anyhow!("wait version not set"))?)
517    }
518
519    pub async fn create_table(
520        &self,
521        source: Option<PbSource>,
522        table: PbTable,
523        graph: StreamFragmentGraph,
524        job_type: PbTableJobType,
525        if_not_exists: bool,
526        dependencies: HashSet<ObjectId>,
527    ) -> Result<WaitVersion> {
528        let request = CreateTableRequest {
529            materialized_view: Some(table),
530            fragment_graph: Some(graph),
531            source,
532            job_type: job_type as _,
533            if_not_exists,
534            dependencies: dependencies.into_iter().collect(),
535        };
536        let resp = self.inner.create_table(request).await?;
537        // TODO: handle error in `resp.status` here
538        Ok(resp
539            .version
540            .ok_or_else(|| anyhow!("wait version not set"))?)
541    }
542
543    pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
544        let request = CommentOnRequest {
545            comment: Some(comment),
546        };
547        let resp = self.inner.comment_on(request).await?;
548        Ok(resp
549            .version
550            .ok_or_else(|| anyhow!("wait version not set"))?)
551    }
552
553    pub async fn alter_name(
554        &self,
555        object: alter_name_request::Object,
556        name: &str,
557    ) -> Result<WaitVersion> {
558        let request = AlterNameRequest {
559            object: Some(object),
560            new_name: name.to_owned(),
561        };
562        let resp = self.inner.alter_name(request).await?;
563        Ok(resp
564            .version
565            .ok_or_else(|| anyhow!("wait version not set"))?)
566    }
567
568    pub async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<WaitVersion> {
569        let request = AlterOwnerRequest {
570            object: Some(object),
571            owner_id,
572        };
573        let resp = self.inner.alter_owner(request).await?;
574        Ok(resp
575            .version
576            .ok_or_else(|| anyhow!("wait version not set"))?)
577    }
578
579    pub async fn alter_database_param(
580        &self,
581        database_id: DatabaseId,
582        param: AlterDatabaseParam,
583    ) -> Result<WaitVersion> {
584        let request = match param {
585            AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms) => {
586                let barrier_interval_ms = OptionalUint32 {
587                    value: barrier_interval_ms,
588                };
589                AlterDatabaseParamRequest {
590                    database_id,
591                    param: Some(alter_database_param_request::Param::BarrierIntervalMs(
592                        barrier_interval_ms,
593                    )),
594                }
595            }
596            AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency) => {
597                let checkpoint_frequency = OptionalUint64 {
598                    value: checkpoint_frequency,
599                };
600                AlterDatabaseParamRequest {
601                    database_id,
602                    param: Some(alter_database_param_request::Param::CheckpointFrequency(
603                        checkpoint_frequency,
604                    )),
605                }
606            }
607        };
608        let resp = self.inner.alter_database_param(request).await?;
609        Ok(resp
610            .version
611            .ok_or_else(|| anyhow!("wait version not set"))?)
612    }
613
614    pub async fn alter_set_schema(
615        &self,
616        object: alter_set_schema_request::Object,
617        new_schema_id: u32,
618    ) -> Result<WaitVersion> {
619        let request = AlterSetSchemaRequest {
620            new_schema_id,
621            object: Some(object),
622        };
623        let resp = self.inner.alter_set_schema(request).await?;
624        Ok(resp
625            .version
626            .ok_or_else(|| anyhow!("wait version not set"))?)
627    }
628
629    pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
630        let request = AlterSourceRequest {
631            source: Some(source),
632        };
633        let resp = self.inner.alter_source(request).await?;
634        Ok(resp
635            .version
636            .ok_or_else(|| anyhow!("wait version not set"))?)
637    }
638
639    pub async fn alter_parallelism(
640        &self,
641        table_id: u32,
642        parallelism: PbTableParallelism,
643        deferred: bool,
644    ) -> Result<()> {
645        let request = AlterParallelismRequest {
646            table_id,
647            parallelism: Some(parallelism),
648            deferred,
649        };
650
651        self.inner.alter_parallelism(request).await?;
652        Ok(())
653    }
654
655    pub async fn alter_cdc_table_backfill_parallelism(
656        &self,
657        table_id: u32,
658        parallelism: PbTableParallelism,
659    ) -> Result<()> {
660        let request = AlterCdcTableBackfillParallelismRequest {
661            table_id,
662            parallelism: Some(parallelism),
663        };
664        self.inner
665            .alter_cdc_table_backfill_parallelism(request)
666            .await?;
667        Ok(())
668    }
669
670    pub async fn alter_resource_group(
671        &self,
672        table_id: u32,
673        resource_group: Option<String>,
674        deferred: bool,
675    ) -> Result<()> {
676        let request = AlterResourceGroupRequest {
677            table_id,
678            resource_group,
679            deferred,
680        };
681
682        self.inner.alter_resource_group(request).await?;
683        Ok(())
684    }
685
686    pub async fn alter_swap_rename(
687        &self,
688        object: alter_swap_rename_request::Object,
689    ) -> Result<WaitVersion> {
690        let request = AlterSwapRenameRequest {
691            object: Some(object),
692        };
693        let resp = self.inner.alter_swap_rename(request).await?;
694        Ok(resp
695            .version
696            .ok_or_else(|| anyhow!("wait version not set"))?)
697    }
698
699    pub async fn alter_secret(
700        &self,
701        secret_id: u32,
702        secret_name: String,
703        database_id: u32,
704        schema_id: u32,
705        owner_id: u32,
706        value: Vec<u8>,
707    ) -> Result<WaitVersion> {
708        let request = AlterSecretRequest {
709            secret_id,
710            name: secret_name,
711            database_id,
712            schema_id,
713            owner_id,
714            value,
715        };
716        let resp = self.inner.alter_secret(request).await?;
717        Ok(resp
718            .version
719            .ok_or_else(|| anyhow!("wait version not set"))?)
720    }
721
722    pub async fn replace_job(
723        &self,
724        graph: StreamFragmentGraph,
725        replace_job: ReplaceJob,
726    ) -> Result<WaitVersion> {
727        let request = ReplaceJobPlanRequest {
728            plan: Some(ReplaceJobPlan {
729                fragment_graph: Some(graph),
730                replace_job: Some(replace_job),
731            }),
732        };
733        let resp = self.inner.replace_job_plan(request).await?;
734        // TODO: handle error in `resp.status` here
735        Ok(resp
736            .version
737            .ok_or_else(|| anyhow!("wait version not set"))?)
738    }
739
740    pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
741        let request = AutoSchemaChangeRequest {
742            schema_change: Some(schema_change),
743        };
744        let _ = self.inner.auto_schema_change(request).await?;
745        Ok(())
746    }
747
748    pub async fn create_view(
749        &self,
750        view: PbView,
751        dependencies: HashSet<ObjectId>,
752    ) -> Result<WaitVersion> {
753        let request = CreateViewRequest {
754            view: Some(view),
755            dependencies: dependencies.into_iter().collect(),
756        };
757        let resp = self.inner.create_view(request).await?;
758        // TODO: handle error in `resp.status` here
759        Ok(resp
760            .version
761            .ok_or_else(|| anyhow!("wait version not set"))?)
762    }
763
764    pub async fn create_index(
765        &self,
766        index: PbIndex,
767        table: PbTable,
768        graph: StreamFragmentGraph,
769        if_not_exists: bool,
770    ) -> Result<WaitVersion> {
771        let request = CreateIndexRequest {
772            index: Some(index),
773            index_table: Some(table),
774            fragment_graph: Some(graph),
775            if_not_exists,
776        };
777        let resp = self.inner.create_index(request).await?;
778        // TODO: handle error in `resp.status` here
779        Ok(resp
780            .version
781            .ok_or_else(|| anyhow!("wait version not set"))?)
782    }
783
784    pub async fn drop_table(
785        &self,
786        source_id: Option<u32>,
787        table_id: TableId,
788        cascade: bool,
789    ) -> Result<WaitVersion> {
790        let request = DropTableRequest {
791            source_id: source_id.map(SourceId::Id),
792            table_id: table_id.table_id(),
793            cascade,
794        };
795
796        let resp = self.inner.drop_table(request).await?;
797        Ok(resp
798            .version
799            .ok_or_else(|| anyhow!("wait version not set"))?)
800    }
801
802    pub async fn compact_iceberg_table(&self, sink_id: u32) -> Result<u64> {
803        let request = CompactIcebergTableRequest { sink_id };
804        let resp = self.inner.compact_iceberg_table(request).await?;
805        Ok(resp.task_id)
806    }
807
808    pub async fn expire_iceberg_table_snapshots(&self, sink_id: u32) -> Result<()> {
809        let request = ExpireIcebergTableSnapshotsRequest { sink_id };
810        let _resp = self.inner.expire_iceberg_table_snapshots(request).await?;
811        Ok(())
812    }
813
814    pub async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<WaitVersion> {
815        let request = DropViewRequest { view_id, cascade };
816        let resp = self.inner.drop_view(request).await?;
817        Ok(resp
818            .version
819            .ok_or_else(|| anyhow!("wait version not set"))?)
820    }
821
822    pub async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<WaitVersion> {
823        let request = DropSourceRequest { source_id, cascade };
824        let resp = self.inner.drop_source(request).await?;
825        Ok(resp
826            .version
827            .ok_or_else(|| anyhow!("wait version not set"))?)
828    }
829
830    pub async fn drop_sink(&self, sink_id: u32, cascade: bool) -> Result<WaitVersion> {
831        let request = DropSinkRequest { sink_id, cascade };
832        let resp = self.inner.drop_sink(request).await?;
833        Ok(resp
834            .version
835            .ok_or_else(|| anyhow!("wait version not set"))?)
836    }
837
838    pub async fn drop_subscription(
839        &self,
840        subscription_id: u32,
841        cascade: bool,
842    ) -> Result<WaitVersion> {
843        let request = DropSubscriptionRequest {
844            subscription_id,
845            cascade,
846        };
847        let resp = self.inner.drop_subscription(request).await?;
848        Ok(resp
849            .version
850            .ok_or_else(|| anyhow!("wait version not set"))?)
851    }
852
853    pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
854        let request = DropIndexRequest {
855            index_id: index_id.index_id,
856            cascade,
857        };
858        let resp = self.inner.drop_index(request).await?;
859        Ok(resp
860            .version
861            .ok_or_else(|| anyhow!("wait version not set"))?)
862    }
863
864    pub async fn drop_function(
865        &self,
866        function_id: FunctionId,
867        cascade: bool,
868    ) -> Result<WaitVersion> {
869        let request = DropFunctionRequest {
870            function_id: function_id.0,
871            cascade,
872        };
873        let resp = self.inner.drop_function(request).await?;
874        Ok(resp
875            .version
876            .ok_or_else(|| anyhow!("wait version not set"))?)
877    }
878
879    pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
880        let request = DropDatabaseRequest { database_id };
881        let resp = self.inner.drop_database(request).await?;
882        Ok(resp
883            .version
884            .ok_or_else(|| anyhow!("wait version not set"))?)
885    }
886
887    pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
888        let request = DropSchemaRequest { schema_id, cascade };
889        let resp = self.inner.drop_schema(request).await?;
890        Ok(resp
891            .version
892            .ok_or_else(|| anyhow!("wait version not set"))?)
893    }
894
895    // TODO: using UserInfoVersion instead as return type.
896    pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
897        let request = CreateUserRequest { user: Some(user) };
898        let resp = self.inner.create_user(request).await?;
899        Ok(resp.version)
900    }
901
902    pub async fn drop_user(&self, user_id: u32) -> Result<u64> {
903        let request = DropUserRequest { user_id };
904        let resp = self.inner.drop_user(request).await?;
905        Ok(resp.version)
906    }
907
908    pub async fn update_user(
909        &self,
910        user: UserInfo,
911        update_fields: Vec<UpdateField>,
912    ) -> Result<u64> {
913        let request = UpdateUserRequest {
914            user: Some(user),
915            update_fields: update_fields
916                .into_iter()
917                .map(|field| field as i32)
918                .collect::<Vec<_>>(),
919        };
920        let resp = self.inner.update_user(request).await?;
921        Ok(resp.version)
922    }
923
924    pub async fn grant_privilege(
925        &self,
926        user_ids: Vec<u32>,
927        privileges: Vec<GrantPrivilege>,
928        with_grant_option: bool,
929        granted_by: u32,
930    ) -> Result<u64> {
931        let request = GrantPrivilegeRequest {
932            user_ids,
933            privileges,
934            with_grant_option,
935            granted_by,
936        };
937        let resp = self.inner.grant_privilege(request).await?;
938        Ok(resp.version)
939    }
940
941    pub async fn revoke_privilege(
942        &self,
943        user_ids: Vec<u32>,
944        privileges: Vec<GrantPrivilege>,
945        granted_by: u32,
946        revoke_by: u32,
947        revoke_grant_option: bool,
948        cascade: bool,
949    ) -> Result<u64> {
950        let request = RevokePrivilegeRequest {
951            user_ids,
952            privileges,
953            granted_by,
954            revoke_by,
955            revoke_grant_option,
956            cascade,
957        };
958        let resp = self.inner.revoke_privilege(request).await?;
959        Ok(resp.version)
960    }
961
962    pub async fn alter_default_privilege(
963        &self,
964        user_ids: Vec<u32>,
965        database_id: DatabaseId,
966        schema_ids: Vec<SchemaId>,
967        operation: AlterDefaultPrivilegeOperation,
968        granted_by: u32,
969    ) -> Result<()> {
970        let request = AlterDefaultPrivilegeRequest {
971            user_ids,
972            database_id,
973            schema_ids,
974            operation: Some(operation),
975            granted_by,
976        };
977        self.inner.alter_default_privilege(request).await?;
978        Ok(())
979    }
980
981    /// Unregister the current node from the cluster.
982    pub async fn unregister(&self) -> Result<()> {
983        let request = DeleteWorkerNodeRequest {
984            host: Some(self.host_addr.to_protobuf()),
985        };
986        self.inner.delete_worker_node(request).await?;
987        self.shutting_down.store(true, Relaxed);
988        Ok(())
989    }
990
991    /// Try to unregister the current worker from the cluster with best effort. Log the result.
992    pub async fn try_unregister(&self) {
993        match self.unregister().await {
994            Ok(_) => {
995                tracing::info!(
996                    worker_id = self.worker_id(),
997                    "successfully unregistered from meta service",
998                )
999            }
1000            Err(e) => {
1001                tracing::warn!(
1002                    error = %e.as_report(),
1003                    worker_id = self.worker_id(),
1004                    "failed to unregister from meta service",
1005                );
1006            }
1007        }
1008    }
1009
1010    pub async fn update_schedulability(
1011        &self,
1012        worker_ids: &[u32],
1013        schedulability: Schedulability,
1014    ) -> Result<UpdateWorkerNodeSchedulabilityResponse> {
1015        let request = UpdateWorkerNodeSchedulabilityRequest {
1016            worker_ids: worker_ids.to_vec(),
1017            schedulability: schedulability.into(),
1018        };
1019        let resp = self
1020            .inner
1021            .update_worker_node_schedulability(request)
1022            .await?;
1023        Ok(resp)
1024    }
1025
1026    pub async fn list_worker_nodes(
1027        &self,
1028        worker_type: Option<WorkerType>,
1029    ) -> Result<Vec<WorkerNode>> {
1030        let request = ListAllNodesRequest {
1031            worker_type: worker_type.map(Into::into),
1032            include_starting_nodes: true,
1033        };
1034        let resp = self.inner.list_all_nodes(request).await?;
1035        Ok(resp.nodes)
1036    }
1037
1038    /// Starts a heartbeat worker.
1039    pub fn start_heartbeat_loop(
1040        meta_client: MetaClient,
1041        min_interval: Duration,
1042    ) -> (JoinHandle<()>, Sender<()>) {
1043        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1044        let join_handle = tokio::spawn(async move {
1045            let mut min_interval_ticker = tokio::time::interval(min_interval);
1046            loop {
1047                tokio::select! {
1048                    biased;
1049                    // Shutdown
1050                    _ = &mut shutdown_rx => {
1051                        tracing::info!("Heartbeat loop is stopped");
1052                        return;
1053                    }
1054                    // Wait for interval
1055                    _ = min_interval_ticker.tick() => {},
1056                }
1057                tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
1058                match tokio::time::timeout(
1059                    // TODO: decide better min_interval for timeout
1060                    min_interval * 3,
1061                    meta_client.send_heartbeat(meta_client.worker_id()),
1062                )
1063                .await
1064                {
1065                    Ok(Ok(_)) => {}
1066                    Ok(Err(err)) => {
1067                        tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
1068                    }
1069                    Err(_) => {
1070                        tracing::warn!("Failed to send_heartbeat: timeout");
1071                    }
1072                }
1073            }
1074        });
1075        (join_handle, shutdown_tx)
1076    }
1077
1078    pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
1079        let request = RisectlListStateTablesRequest {};
1080        let resp = self.inner.risectl_list_state_tables(request).await?;
1081        Ok(resp.tables)
1082    }
1083
1084    pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
1085        let request = FlushRequest { database_id };
1086        let resp = self.inner.flush(request).await?;
1087        Ok(HummockVersionId::new(resp.hummock_version_id))
1088    }
1089
1090    pub async fn wait(&self) -> Result<()> {
1091        let request = WaitRequest {};
1092        self.inner.wait(request).await?;
1093        Ok(())
1094    }
1095
1096    pub async fn recover(&self) -> Result<()> {
1097        let request = RecoverRequest {};
1098        self.inner.recover(request).await?;
1099        Ok(())
1100    }
1101
1102    pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
1103        let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
1104        let resp = self.inner.cancel_creating_jobs(request).await?;
1105        Ok(resp.canceled_jobs)
1106    }
1107
1108    pub async fn list_table_fragments(
1109        &self,
1110        table_ids: &[u32],
1111    ) -> Result<HashMap<u32, TableFragmentInfo>> {
1112        let request = ListTableFragmentsRequest {
1113            table_ids: table_ids.to_vec(),
1114        };
1115        let resp = self.inner.list_table_fragments(request).await?;
1116        Ok(resp.table_fragments)
1117    }
1118
1119    pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1120        let resp = self
1121            .inner
1122            .list_streaming_job_states(ListStreamingJobStatesRequest {})
1123            .await?;
1124        Ok(resp.states)
1125    }
1126
1127    pub async fn list_fragment_distributions(&self) -> Result<Vec<FragmentDistribution>> {
1128        let resp = self
1129            .inner
1130            .list_fragment_distribution(ListFragmentDistributionRequest {})
1131            .await?;
1132        Ok(resp.distributions)
1133    }
1134
1135    pub async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
1136        let resp = self
1137            .inner
1138            .list_creating_fragment_distribution(ListCreatingFragmentDistributionRequest {})
1139            .await?;
1140        Ok(resp.distributions)
1141    }
1142
1143    pub async fn get_fragment_by_id(
1144        &self,
1145        fragment_id: u32,
1146    ) -> Result<Option<FragmentDistribution>> {
1147        let resp = self
1148            .inner
1149            .get_fragment_by_id(GetFragmentByIdRequest { fragment_id })
1150            .await?;
1151        Ok(resp.distribution)
1152    }
1153
1154    pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1155        let resp = self
1156            .inner
1157            .list_actor_states(ListActorStatesRequest {})
1158            .await?;
1159        Ok(resp.states)
1160    }
1161
1162    pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1163        let resp = self
1164            .inner
1165            .list_actor_splits(ListActorSplitsRequest {})
1166            .await?;
1167
1168        Ok(resp.actor_splits)
1169    }
1170
1171    pub async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
1172        let resp = self
1173            .inner
1174            .list_object_dependencies(ListObjectDependenciesRequest {})
1175            .await?;
1176        Ok(resp.dependencies)
1177    }
1178
1179    pub async fn pause(&self) -> Result<PauseResponse> {
1180        let request = PauseRequest {};
1181        let resp = self.inner.pause(request).await?;
1182        Ok(resp)
1183    }
1184
1185    pub async fn resume(&self) -> Result<ResumeResponse> {
1186        let request = ResumeRequest {};
1187        let resp = self.inner.resume(request).await?;
1188        Ok(resp)
1189    }
1190
1191    pub async fn apply_throttle(
1192        &self,
1193        kind: PbThrottleTarget,
1194        id: u32,
1195        rate: Option<u32>,
1196    ) -> Result<ApplyThrottleResponse> {
1197        let request = ApplyThrottleRequest {
1198            kind: kind as i32,
1199            id,
1200            rate,
1201        };
1202        let resp = self.inner.apply_throttle(request).await?;
1203        Ok(resp)
1204    }
1205
1206    pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1207        let resp = self
1208            .inner
1209            .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1210            .await?;
1211        Ok(resp.get_status().unwrap())
1212    }
1213
1214    pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1215        let request = GetClusterInfoRequest {};
1216        let resp = self.inner.get_cluster_info(request).await?;
1217        Ok(resp)
1218    }
1219
1220    pub async fn reschedule(
1221        &self,
1222        worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1223        revision: u64,
1224        resolve_no_shuffle_upstream: bool,
1225    ) -> Result<(bool, u64)> {
1226        let request = RescheduleRequest {
1227            revision,
1228            resolve_no_shuffle_upstream,
1229            worker_reschedules,
1230        };
1231        let resp = self.inner.reschedule(request).await?;
1232        Ok((resp.success, resp.revision))
1233    }
1234
1235    pub async fn risectl_get_pinned_versions_summary(
1236        &self,
1237    ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1238        let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1239        self.inner
1240            .rise_ctl_get_pinned_versions_summary(request)
1241            .await
1242    }
1243
1244    pub async fn risectl_get_checkpoint_hummock_version(
1245        &self,
1246    ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1247        let request = RiseCtlGetCheckpointVersionRequest {};
1248        self.inner.rise_ctl_get_checkpoint_version(request).await
1249    }
1250
1251    pub async fn risectl_pause_hummock_version_checkpoint(
1252        &self,
1253    ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1254        let request = RiseCtlPauseVersionCheckpointRequest {};
1255        self.inner.rise_ctl_pause_version_checkpoint(request).await
1256    }
1257
1258    pub async fn risectl_resume_hummock_version_checkpoint(
1259        &self,
1260    ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1261        let request = RiseCtlResumeVersionCheckpointRequest {};
1262        self.inner.rise_ctl_resume_version_checkpoint(request).await
1263    }
1264
1265    pub async fn init_metadata_for_replay(
1266        &self,
1267        tables: Vec<PbTable>,
1268        compaction_groups: Vec<CompactionGroupInfo>,
1269    ) -> Result<()> {
1270        let req = InitMetadataForReplayRequest {
1271            tables,
1272            compaction_groups,
1273        };
1274        let _resp = self.inner.init_metadata_for_replay(req).await?;
1275        Ok(())
1276    }
1277
1278    pub async fn replay_version_delta(
1279        &self,
1280        version_delta: HummockVersionDelta,
1281    ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1282        let req = ReplayVersionDeltaRequest {
1283            version_delta: Some(version_delta.into()),
1284        };
1285        let resp = self.inner.replay_version_delta(req).await?;
1286        Ok((
1287            HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1288            resp.modified_compaction_groups,
1289        ))
1290    }
1291
1292    pub async fn list_version_deltas(
1293        &self,
1294        start_id: HummockVersionId,
1295        num_limit: u32,
1296        committed_epoch_limit: HummockEpoch,
1297    ) -> Result<Vec<HummockVersionDelta>> {
1298        let req = ListVersionDeltasRequest {
1299            start_id: start_id.to_u64(),
1300            num_limit,
1301            committed_epoch_limit,
1302        };
1303        Ok(self
1304            .inner
1305            .list_version_deltas(req)
1306            .await?
1307            .version_deltas
1308            .unwrap()
1309            .version_deltas
1310            .iter()
1311            .map(HummockVersionDelta::from_rpc_protobuf)
1312            .collect())
1313    }
1314
1315    pub async fn trigger_compaction_deterministic(
1316        &self,
1317        version_id: HummockVersionId,
1318        compaction_groups: Vec<CompactionGroupId>,
1319    ) -> Result<()> {
1320        let req = TriggerCompactionDeterministicRequest {
1321            version_id: version_id.to_u64(),
1322            compaction_groups,
1323        };
1324        self.inner.trigger_compaction_deterministic(req).await?;
1325        Ok(())
1326    }
1327
1328    pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1329        let req = DisableCommitEpochRequest {};
1330        Ok(HummockVersion::from_rpc_protobuf(
1331            &self
1332                .inner
1333                .disable_commit_epoch(req)
1334                .await?
1335                .current_version
1336                .unwrap(),
1337        ))
1338    }
1339
1340    pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1341        let req = GetAssignedCompactTaskNumRequest {};
1342        let resp = self.inner.get_assigned_compact_task_num(req).await?;
1343        Ok(resp.num_tasks as usize)
1344    }
1345
1346    pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1347        let req = RiseCtlListCompactionGroupRequest {};
1348        let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1349        Ok(resp.compaction_groups)
1350    }
1351
1352    pub async fn risectl_update_compaction_config(
1353        &self,
1354        compaction_groups: &[CompactionGroupId],
1355        configs: &[MutableConfig],
1356    ) -> Result<()> {
1357        let req = RiseCtlUpdateCompactionConfigRequest {
1358            compaction_group_ids: compaction_groups.to_vec(),
1359            configs: configs
1360                .iter()
1361                .map(
1362                    |c| rise_ctl_update_compaction_config_request::MutableConfig {
1363                        mutable_config: Some(c.clone()),
1364                    },
1365                )
1366                .collect(),
1367        };
1368        let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1369        Ok(())
1370    }
1371
1372    pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1373        let req = BackupMetaRequest { remarks };
1374        let resp = self.inner.backup_meta(req).await?;
1375        Ok(resp.job_id)
1376    }
1377
1378    pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1379        let req = GetBackupJobStatusRequest { job_id };
1380        let resp = self.inner.get_backup_job_status(req).await?;
1381        Ok((resp.job_status(), resp.message))
1382    }
1383
1384    pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1385        let req = DeleteMetaSnapshotRequest {
1386            snapshot_ids: snapshot_ids.to_vec(),
1387        };
1388        let _resp = self.inner.delete_meta_snapshot(req).await?;
1389        Ok(())
1390    }
1391
1392    pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1393        let req = GetMetaSnapshotManifestRequest {};
1394        let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1395        Ok(resp.manifest.expect("should exist"))
1396    }
1397
1398    pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1399        let req = GetTelemetryInfoRequest {};
1400        let resp = self.inner.get_telemetry_info(req).await?;
1401        Ok(resp)
1402    }
1403
1404    pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1405        let req = GetMetaStoreInfoRequest {};
1406        let resp = self.inner.get_meta_store_info(req).await?;
1407        Ok(resp.meta_store_endpoint)
1408    }
1409
1410    pub async fn alter_sink_props(
1411        &self,
1412        sink_id: u32,
1413        changed_props: BTreeMap<String, String>,
1414        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1415        connector_conn_ref: Option<u32>,
1416    ) -> Result<()> {
1417        let req = AlterConnectorPropsRequest {
1418            object_id: sink_id,
1419            changed_props: changed_props.into_iter().collect(),
1420            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1421            connector_conn_ref,
1422            object_type: AlterConnectorPropsObject::Sink as i32,
1423            extra_options: None,
1424        };
1425        let _resp = self.inner.alter_connector_props(req).await?;
1426        Ok(())
1427    }
1428
1429    pub async fn alter_iceberg_table_props(
1430        &self,
1431        table_id: u32,
1432        sink_id: u32,
1433        source_id: u32,
1434        changed_props: BTreeMap<String, String>,
1435        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1436        connector_conn_ref: Option<u32>,
1437    ) -> Result<()> {
1438        let req = AlterConnectorPropsRequest {
1439            object_id: table_id,
1440            changed_props: changed_props.into_iter().collect(),
1441            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1442            connector_conn_ref,
1443            object_type: AlterConnectorPropsObject::IcebergTable as i32,
1444            extra_options: Some(ExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds {
1445                sink_id: sink_id as i32,
1446                source_id: source_id as i32,
1447            })),
1448        };
1449        let _resp = self.inner.alter_connector_props(req).await?;
1450        Ok(())
1451    }
1452
1453    pub async fn alter_source_connector_props(
1454        &self,
1455        source_id: u32,
1456        changed_props: BTreeMap<String, String>,
1457        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1458        connector_conn_ref: Option<u32>,
1459    ) -> Result<()> {
1460        let req = AlterConnectorPropsRequest {
1461            object_id: source_id,
1462            changed_props: changed_props.into_iter().collect(),
1463            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1464            connector_conn_ref,
1465            object_type: AlterConnectorPropsObject::Source as i32,
1466            extra_options: None,
1467        };
1468        let _resp = self.inner.alter_connector_props(req).await?;
1469        Ok(())
1470    }
1471
1472    pub async fn set_system_param(
1473        &self,
1474        param: String,
1475        value: Option<String>,
1476    ) -> Result<Option<SystemParamsReader>> {
1477        let req = SetSystemParamRequest { param, value };
1478        let resp = self.inner.set_system_param(req).await?;
1479        Ok(resp.params.map(SystemParamsReader::from))
1480    }
1481
1482    pub async fn get_session_params(&self) -> Result<String> {
1483        let req = GetSessionParamsRequest {};
1484        let resp = self.inner.get_session_params(req).await?;
1485        Ok(resp.params)
1486    }
1487
1488    pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1489        let req = SetSessionParamRequest { param, value };
1490        let resp = self.inner.set_session_param(req).await?;
1491        Ok(resp.param)
1492    }
1493
1494    pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1495        let req = GetDdlProgressRequest {};
1496        let resp = self.inner.get_ddl_progress(req).await?;
1497        Ok(resp.ddl_progress)
1498    }
1499
1500    pub async fn split_compaction_group(
1501        &self,
1502        group_id: CompactionGroupId,
1503        table_ids_to_new_group: &[StateTableId],
1504        partition_vnode_count: u32,
1505    ) -> Result<CompactionGroupId> {
1506        let req = SplitCompactionGroupRequest {
1507            group_id,
1508            table_ids: table_ids_to_new_group.to_vec(),
1509            partition_vnode_count,
1510        };
1511        let resp = self.inner.split_compaction_group(req).await?;
1512        Ok(resp.new_group_id)
1513    }
1514
1515    pub async fn get_tables(
1516        &self,
1517        table_ids: &[u32],
1518        include_dropped_tables: bool,
1519    ) -> Result<HashMap<u32, Table>> {
1520        let req = GetTablesRequest {
1521            table_ids: table_ids.to_vec(),
1522            include_dropped_tables,
1523        };
1524        let resp = self.inner.get_tables(req).await?;
1525        Ok(resp.tables)
1526    }
1527
1528    pub async fn list_serving_vnode_mappings(
1529        &self,
1530    ) -> Result<HashMap<u32, (u32, WorkerSlotMapping)>> {
1531        let req = GetServingVnodeMappingsRequest {};
1532        let resp = self.inner.get_serving_vnode_mappings(req).await?;
1533        let mappings = resp
1534            .worker_slot_mappings
1535            .into_iter()
1536            .map(|p| {
1537                (
1538                    p.fragment_id,
1539                    (
1540                        resp.fragment_to_table
1541                            .get(&p.fragment_id)
1542                            .cloned()
1543                            .unwrap_or(0),
1544                        WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1545                    ),
1546                )
1547            })
1548            .collect();
1549        Ok(mappings)
1550    }
1551
1552    pub async fn risectl_list_compaction_status(
1553        &self,
1554    ) -> Result<(
1555        Vec<CompactStatus>,
1556        Vec<CompactTaskAssignment>,
1557        Vec<CompactTaskProgress>,
1558    )> {
1559        let req = RiseCtlListCompactionStatusRequest {};
1560        let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1561        Ok((
1562            resp.compaction_statuses,
1563            resp.task_assignment,
1564            resp.task_progress,
1565        ))
1566    }
1567
1568    pub async fn get_compaction_score(
1569        &self,
1570        compaction_group_id: CompactionGroupId,
1571    ) -> Result<Vec<PickerInfo>> {
1572        let req = GetCompactionScoreRequest {
1573            compaction_group_id,
1574        };
1575        let resp = self.inner.get_compaction_score(req).await?;
1576        Ok(resp.scores)
1577    }
1578
1579    pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1580        let req = RiseCtlRebuildTableStatsRequest {};
1581        let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1582        Ok(())
1583    }
1584
1585    pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1586        let req = ListBranchedObjectRequest {};
1587        let resp = self.inner.list_branched_object(req).await?;
1588        Ok(resp.branched_objects)
1589    }
1590
1591    pub async fn list_active_write_limit(&self) -> Result<HashMap<u64, WriteLimit>> {
1592        let req = ListActiveWriteLimitRequest {};
1593        let resp = self.inner.list_active_write_limit(req).await?;
1594        Ok(resp.write_limits)
1595    }
1596
1597    pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1598        let req = ListHummockMetaConfigRequest {};
1599        let resp = self.inner.list_hummock_meta_config(req).await?;
1600        Ok(resp.configs)
1601    }
1602
1603    pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1604        let _resp = self
1605            .inner
1606            .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1607            .await?;
1608
1609        Ok(())
1610    }
1611
1612    pub async fn rw_cloud_validate_source(
1613        &self,
1614        source_type: SourceType,
1615        source_config: HashMap<String, String>,
1616    ) -> Result<RwCloudValidateSourceResponse> {
1617        let req = RwCloudValidateSourceRequest {
1618            source_type: source_type.into(),
1619            source_config,
1620        };
1621        let resp = self.inner.rw_cloud_validate_source(req).await?;
1622        Ok(resp)
1623    }
1624
1625    pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1626        self.inner.core.read().await.sink_coordinate_client.clone()
1627    }
1628
1629    pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1630        let req = ListCompactTaskAssignmentRequest {};
1631        let resp = self.inner.list_compact_task_assignment(req).await?;
1632        Ok(resp.task_assignment)
1633    }
1634
1635    pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1636        let req = ListEventLogRequest::default();
1637        let resp = self.inner.list_event_log(req).await?;
1638        Ok(resp.event_logs)
1639    }
1640
1641    pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1642        let req = ListCompactTaskProgressRequest {};
1643        let resp = self.inner.list_compact_task_progress(req).await?;
1644        Ok(resp.task_progress)
1645    }
1646
1647    #[cfg(madsim)]
1648    pub fn try_add_panic_event_blocking(
1649        &self,
1650        panic_info: impl Display,
1651        timeout_millis: Option<u64>,
1652    ) {
1653    }
1654
1655    /// If `timeout_millis` is None, default is used.
1656    #[cfg(not(madsim))]
1657    pub fn try_add_panic_event_blocking(
1658        &self,
1659        panic_info: impl Display,
1660        timeout_millis: Option<u64>,
1661    ) {
1662        let event = event_log::EventWorkerNodePanic {
1663            worker_id: self.worker_id,
1664            worker_type: self.worker_type.into(),
1665            host_addr: Some(self.host_addr.to_protobuf()),
1666            panic_info: format!("{panic_info}"),
1667        };
1668        let grpc_meta_client = self.inner.clone();
1669        let _ = thread::spawn(move || {
1670            let rt = tokio::runtime::Builder::new_current_thread()
1671                .enable_all()
1672                .build()
1673                .unwrap();
1674            let req = AddEventLogRequest {
1675                event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1676            };
1677            rt.block_on(async {
1678                let _ = tokio::time::timeout(
1679                    Duration::from_millis(timeout_millis.unwrap_or(1000)),
1680                    grpc_meta_client.add_event_log(req),
1681                )
1682                .await;
1683            });
1684        })
1685        .join();
1686    }
1687
1688    pub async fn add_sink_fail_evet(
1689        &self,
1690        sink_id: u32,
1691        sink_name: String,
1692        connector: String,
1693        error: String,
1694    ) -> Result<()> {
1695        let event = event_log::EventSinkFail {
1696            sink_id,
1697            sink_name,
1698            connector,
1699            error,
1700        };
1701        let req = AddEventLogRequest {
1702            event: Some(add_event_log_request::Event::SinkFail(event)),
1703        };
1704        self.inner.add_event_log(req).await?;
1705        Ok(())
1706    }
1707
1708    pub async fn add_cdc_auto_schema_change_fail_event(
1709        &self,
1710        table_id: u32,
1711        table_name: String,
1712        cdc_table_id: String,
1713        upstream_ddl: String,
1714        fail_info: String,
1715    ) -> Result<()> {
1716        let event = event_log::EventAutoSchemaChangeFail {
1717            table_id,
1718            table_name,
1719            cdc_table_id,
1720            upstream_ddl,
1721            fail_info,
1722        };
1723        let req = AddEventLogRequest {
1724            event: Some(add_event_log_request::Event::AutoSchemaChangeFail(event)),
1725        };
1726        self.inner.add_event_log(req).await?;
1727        Ok(())
1728    }
1729
1730    pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1731        let req = CancelCompactTaskRequest {
1732            task_id,
1733            task_status: task_status as _,
1734        };
1735        let resp = self.inner.cancel_compact_task(req).await?;
1736        Ok(resp.ret)
1737    }
1738
1739    pub async fn get_version_by_epoch(
1740        &self,
1741        epoch: HummockEpoch,
1742        table_id: u32,
1743    ) -> Result<PbHummockVersion> {
1744        let req = GetVersionByEpochRequest { epoch, table_id };
1745        let resp = self.inner.get_version_by_epoch(req).await?;
1746        Ok(resp.version.unwrap())
1747    }
1748
1749    pub async fn get_cluster_limits(
1750        &self,
1751    ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1752        let req = GetClusterLimitsRequest {};
1753        let resp = self.inner.get_cluster_limits(req).await?;
1754        Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1755    }
1756
1757    pub async fn merge_compaction_group(
1758        &self,
1759        left_group_id: CompactionGroupId,
1760        right_group_id: CompactionGroupId,
1761    ) -> Result<()> {
1762        let req = MergeCompactionGroupRequest {
1763            left_group_id,
1764            right_group_id,
1765        };
1766        self.inner.merge_compaction_group(req).await?;
1767        Ok(())
1768    }
1769
1770    /// List all rate limits for sources and backfills
1771    pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1772        let request = ListRateLimitsRequest {};
1773        let resp = self.inner.list_rate_limits(request).await?;
1774        Ok(resp.rate_limits)
1775    }
1776
1777    pub async fn list_cdc_progress(&self) -> Result<HashMap<u32, PbCdcProgress>> {
1778        let request = ListCdcProgressRequest {};
1779        let resp = self.inner.list_cdc_progress(request).await?;
1780        Ok(resp.cdc_progress)
1781    }
1782
1783    pub async fn create_iceberg_table(
1784        &self,
1785        table_job_info: PbTableJobInfo,
1786        sink_job_info: PbSinkJobInfo,
1787        iceberg_source: PbSource,
1788        if_not_exists: bool,
1789    ) -> Result<WaitVersion> {
1790        let request = CreateIcebergTableRequest {
1791            table_info: Some(table_job_info),
1792            sink_info: Some(sink_job_info),
1793            iceberg_source: Some(iceberg_source),
1794            if_not_exists,
1795        };
1796
1797        let resp = self.inner.create_iceberg_table(request).await?;
1798        Ok(resp
1799            .version
1800            .ok_or_else(|| anyhow!("wait version not set"))?)
1801    }
1802
1803    pub async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
1804        let request = ListIcebergTablesRequest {};
1805        let resp = self.inner.list_iceberg_tables(request).await?;
1806        Ok(resp.iceberg_tables)
1807    }
1808
1809    /// Get the await tree of all nodes in the cluster.
1810    pub async fn get_cluster_stack_trace(
1811        &self,
1812        actor_traces_format: ActorTracesFormat,
1813    ) -> Result<StackTraceResponse> {
1814        let request = StackTraceRequest {
1815            actor_traces_format: actor_traces_format as i32,
1816        };
1817        let resp = self.inner.stack_trace(request).await?;
1818        Ok(resp)
1819    }
1820
1821    pub async fn set_sync_log_store_aligned(&self, job_id: u32, aligned: bool) -> Result<()> {
1822        let request = SetSyncLogStoreAlignedRequest { job_id, aligned };
1823        self.inner.set_sync_log_store_aligned(request).await?;
1824        Ok(())
1825    }
1826
1827    pub async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
1828        self.inner.refresh(request).await
1829    }
1830}
1831
1832#[async_trait]
1833impl HummockMetaClient for MetaClient {
1834    async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
1835        let req = UnpinVersionBeforeRequest {
1836            context_id: self.worker_id(),
1837            unpin_version_before: unpin_version_before.to_u64(),
1838        };
1839        self.inner.unpin_version_before(req).await?;
1840        Ok(())
1841    }
1842
1843    async fn get_current_version(&self) -> Result<HummockVersion> {
1844        let req = GetCurrentVersionRequest::default();
1845        Ok(HummockVersion::from_rpc_protobuf(
1846            &self
1847                .inner
1848                .get_current_version(req)
1849                .await?
1850                .current_version
1851                .unwrap(),
1852        ))
1853    }
1854
1855    async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
1856        let resp = self
1857            .inner
1858            .get_new_object_ids(GetNewObjectIdsRequest { number })
1859            .await?;
1860        Ok(ObjectIdRange::new(resp.start_id, resp.end_id))
1861    }
1862
1863    async fn commit_epoch_with_change_log(
1864        &self,
1865        _epoch: HummockEpoch,
1866        _sync_result: SyncResult,
1867        _change_log_info: Option<HummockMetaClientChangeLogInfo>,
1868    ) -> Result<()> {
1869        panic!("Only meta service can commit_epoch in production.")
1870    }
1871
1872    async fn trigger_manual_compaction(
1873        &self,
1874        compaction_group_id: u64,
1875        table_id: u32,
1876        level: u32,
1877        sst_ids: Vec<u64>,
1878    ) -> Result<()> {
1879        // TODO: support key_range parameter
1880        let req = TriggerManualCompactionRequest {
1881            compaction_group_id,
1882            table_id,
1883            // if table_id not exist, manual_compaction will include all the sst
1884            // without check internal_table_id
1885            level,
1886            sst_ids,
1887            ..Default::default()
1888        };
1889
1890        self.inner.trigger_manual_compaction(req).await?;
1891        Ok(())
1892    }
1893
1894    async fn trigger_full_gc(
1895        &self,
1896        sst_retention_time_sec: u64,
1897        prefix: Option<String>,
1898    ) -> Result<()> {
1899        self.inner
1900            .trigger_full_gc(TriggerFullGcRequest {
1901                sst_retention_time_sec,
1902                prefix,
1903            })
1904            .await?;
1905        Ok(())
1906    }
1907
1908    async fn subscribe_compaction_event(
1909        &self,
1910    ) -> Result<(
1911        UnboundedSender<SubscribeCompactionEventRequest>,
1912        BoxStream<'static, CompactionEventItem>,
1913    )> {
1914        let (request_sender, request_receiver) =
1915            unbounded_channel::<SubscribeCompactionEventRequest>();
1916        request_sender
1917            .send(SubscribeCompactionEventRequest {
1918                event: Some(subscribe_compaction_event_request::Event::Register(
1919                    Register {
1920                        context_id: self.worker_id(),
1921                    },
1922                )),
1923                create_at: SystemTime::now()
1924                    .duration_since(SystemTime::UNIX_EPOCH)
1925                    .expect("Clock may have gone backwards")
1926                    .as_millis() as u64,
1927            })
1928            .context("Failed to subscribe compaction event")?;
1929
1930        let stream = self
1931            .inner
1932            .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
1933                request_receiver,
1934            )))
1935            .await?;
1936
1937        Ok((request_sender, Box::pin(stream)))
1938    }
1939
1940    async fn get_version_by_epoch(
1941        &self,
1942        epoch: HummockEpoch,
1943        table_id: u32,
1944    ) -> Result<PbHummockVersion> {
1945        self.get_version_by_epoch(epoch, table_id).await
1946    }
1947
1948    async fn subscribe_iceberg_compaction_event(
1949        &self,
1950    ) -> Result<(
1951        UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1952        BoxStream<'static, IcebergCompactionEventItem>,
1953    )> {
1954        let (request_sender, request_receiver) =
1955            unbounded_channel::<SubscribeIcebergCompactionEventRequest>();
1956        request_sender
1957            .send(SubscribeIcebergCompactionEventRequest {
1958                event: Some(subscribe_iceberg_compaction_event_request::Event::Register(
1959                    IcebergRegister {
1960                        context_id: self.worker_id(),
1961                    },
1962                )),
1963                create_at: SystemTime::now()
1964                    .duration_since(SystemTime::UNIX_EPOCH)
1965                    .expect("Clock may have gone backwards")
1966                    .as_millis() as u64,
1967            })
1968            .context("Failed to subscribe compaction event")?;
1969
1970        let stream = self
1971            .inner
1972            .subscribe_iceberg_compaction_event(Request::new(UnboundedReceiverStream::new(
1973                request_receiver,
1974            )))
1975            .await?;
1976
1977        Ok((request_sender, Box::pin(stream)))
1978    }
1979}
1980
1981#[async_trait]
1982impl TelemetryInfoFetcher for MetaClient {
1983    async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
1984        let resp = self
1985            .get_telemetry_info()
1986            .await
1987            .map_err(|e| e.to_report_string())?;
1988        let tracking_id = resp.get_tracking_id().ok();
1989        Ok(tracking_id.map(|id| id.to_owned()))
1990    }
1991}
1992
1993pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
1994
1995#[derive(Debug, Clone)]
1996struct GrpcMetaClientCore {
1997    cluster_client: ClusterServiceClient<Channel>,
1998    meta_member_client: MetaMemberServiceClient<Channel>,
1999    heartbeat_client: HeartbeatServiceClient<Channel>,
2000    ddl_client: DdlServiceClient<Channel>,
2001    hummock_client: HummockManagerServiceClient<Channel>,
2002    notification_client: NotificationServiceClient<Channel>,
2003    stream_client: StreamManagerServiceClient<Channel>,
2004    user_client: UserServiceClient<Channel>,
2005    scale_client: ScaleServiceClient<Channel>,
2006    backup_client: BackupServiceClient<Channel>,
2007    telemetry_client: TelemetryInfoServiceClient<Channel>,
2008    system_params_client: SystemParamsServiceClient<Channel>,
2009    session_params_client: SessionParamServiceClient<Channel>,
2010    serving_client: ServingServiceClient<Channel>,
2011    cloud_client: CloudServiceClient<Channel>,
2012    sink_coordinate_client: SinkCoordinationRpcClient,
2013    event_log_client: EventLogServiceClient<Channel>,
2014    cluster_limit_client: ClusterLimitServiceClient<Channel>,
2015    hosted_iceberg_catalog_service_client: HostedIcebergCatalogServiceClient<Channel>,
2016    monitor_client: MonitorServiceClient<Channel>,
2017}
2018
2019impl GrpcMetaClientCore {
2020    pub(crate) fn new(channel: Channel) -> Self {
2021        let cluster_client = ClusterServiceClient::new(channel.clone());
2022        let meta_member_client = MetaMemberClient::new(channel.clone());
2023        let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
2024        let ddl_client =
2025            DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2026        let hummock_client =
2027            HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2028        let notification_client =
2029            NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2030        let stream_client =
2031            StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2032        let user_client = UserServiceClient::new(channel.clone());
2033        let scale_client =
2034            ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2035        let backup_client = BackupServiceClient::new(channel.clone());
2036        let telemetry_client =
2037            TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2038        let system_params_client = SystemParamsServiceClient::new(channel.clone());
2039        let session_params_client = SessionParamServiceClient::new(channel.clone());
2040        let serving_client = ServingServiceClient::new(channel.clone());
2041        let cloud_client = CloudServiceClient::new(channel.clone());
2042        let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone())
2043            .max_decoding_message_size(usize::MAX);
2044        let event_log_client = EventLogServiceClient::new(channel.clone());
2045        let cluster_limit_client = ClusterLimitServiceClient::new(channel.clone());
2046        let hosted_iceberg_catalog_service_client =
2047            HostedIcebergCatalogServiceClient::new(channel.clone());
2048        let monitor_client = MonitorServiceClient::new(channel);
2049
2050        GrpcMetaClientCore {
2051            cluster_client,
2052            meta_member_client,
2053            heartbeat_client,
2054            ddl_client,
2055            hummock_client,
2056            notification_client,
2057            stream_client,
2058            user_client,
2059            scale_client,
2060            backup_client,
2061            telemetry_client,
2062            system_params_client,
2063            session_params_client,
2064            serving_client,
2065            cloud_client,
2066            sink_coordinate_client,
2067            event_log_client,
2068            cluster_limit_client,
2069            hosted_iceberg_catalog_service_client,
2070            monitor_client,
2071        }
2072    }
2073}
2074
2075/// Client to meta server. Cloning the instance is lightweight.
2076///
2077/// It is a wrapper of tonic client. See [`crate::meta_rpc_client_method_impl`].
2078#[derive(Debug, Clone)]
2079struct GrpcMetaClient {
2080    member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
2081    core: Arc<RwLock<GrpcMetaClientCore>>,
2082}
2083
2084type MetaMemberClient = MetaMemberServiceClient<Channel>;
2085
2086struct MetaMemberGroup {
2087    members: LruCache<http::Uri, Option<MetaMemberClient>>,
2088}
2089
2090struct MetaMemberManagement {
2091    core_ref: Arc<RwLock<GrpcMetaClientCore>>,
2092    members: Either<MetaMemberClient, MetaMemberGroup>,
2093    current_leader: http::Uri,
2094    meta_config: Arc<MetaConfig>,
2095}
2096
2097impl MetaMemberManagement {
2098    const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
2099
2100    fn host_address_to_uri(addr: HostAddress) -> http::Uri {
2101        format!("http://{}:{}", addr.host, addr.port)
2102            .parse()
2103            .unwrap()
2104    }
2105
2106    async fn recreate_core(&self, channel: Channel) {
2107        let mut core = self.core_ref.write().await;
2108        *core = GrpcMetaClientCore::new(channel);
2109    }
2110
2111    async fn refresh_members(&mut self) -> Result<()> {
2112        let leader_addr = match self.members.as_mut() {
2113            Either::Left(client) => {
2114                let resp = client
2115                    .to_owned()
2116                    .members(MembersRequest {})
2117                    .await
2118                    .map_err(RpcError::from_meta_status)?;
2119                let resp = resp.into_inner();
2120                resp.members.into_iter().find(|member| member.is_leader)
2121            }
2122            Either::Right(member_group) => {
2123                let mut fetched_members = None;
2124
2125                for (addr, client) in &mut member_group.members {
2126                    let members: Result<_> = try {
2127                        let mut client = match client {
2128                            Some(cached_client) => cached_client.to_owned(),
2129                            None => {
2130                                let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
2131                                let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
2132                                    .await
2133                                    .context("failed to create client")?;
2134                                let new_client: MetaMemberClient =
2135                                    MetaMemberServiceClient::new(channel);
2136                                *client = Some(new_client.clone());
2137
2138                                new_client
2139                            }
2140                        };
2141
2142                        let resp = client
2143                            .members(MembersRequest {})
2144                            .await
2145                            .context("failed to fetch members")?;
2146
2147                        resp.into_inner().members
2148                    };
2149
2150                    let fetched = members.is_ok();
2151                    fetched_members = Some(members);
2152                    if fetched {
2153                        break;
2154                    }
2155                }
2156
2157                let members = fetched_members
2158                    .context("no member available in the list")?
2159                    .context("could not refresh members")?;
2160
2161                // find new leader
2162                let mut leader = None;
2163                for member in members {
2164                    if member.is_leader {
2165                        leader = Some(member.clone());
2166                    }
2167
2168                    let addr = Self::host_address_to_uri(member.address.unwrap());
2169                    // We don't clean any expired addrs here to deal with some extreme situations.
2170                    if !member_group.members.contains(&addr) {
2171                        tracing::info!("new meta member joined: {}", addr);
2172                        member_group.members.put(addr, None);
2173                    }
2174                }
2175
2176                leader
2177            }
2178        };
2179
2180        if let Some(leader) = leader_addr {
2181            let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
2182
2183            if discovered_leader != self.current_leader {
2184                tracing::info!("new meta leader {} discovered", discovered_leader);
2185
2186                let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
2187                    Duration::from_secs(self.meta_config.meta_leader_lease_secs),
2188                    false,
2189                );
2190
2191                let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
2192                    let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
2193                    GrpcMetaClient::connect_to_endpoint(endpoint).await
2194                })
2195                .await?;
2196
2197                self.recreate_core(channel).await;
2198                self.current_leader = discovered_leader;
2199            }
2200        }
2201
2202        Ok(())
2203    }
2204}
2205
2206impl GrpcMetaClient {
2207    // See `Endpoint::http2_keep_alive_interval`
2208    const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
2209    // See `Endpoint::keep_alive_timeout`
2210    const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
2211    // Retry base interval in ms for connecting to meta server.
2212    const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
2213    // Max retry times for connecting to meta server.
2214    const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
2215
2216    fn start_meta_member_monitor(
2217        &self,
2218        init_leader_addr: http::Uri,
2219        members: Either<MetaMemberClient, MetaMemberGroup>,
2220        force_refresh_receiver: Receiver<Sender<Result<()>>>,
2221        meta_config: Arc<MetaConfig>,
2222    ) -> Result<()> {
2223        let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
2224        let current_leader = init_leader_addr;
2225
2226        let enable_period_tick = matches!(members, Either::Right(_));
2227
2228        let member_management = MetaMemberManagement {
2229            core_ref,
2230            members,
2231            current_leader,
2232            meta_config,
2233        };
2234
2235        let mut force_refresh_receiver = force_refresh_receiver;
2236
2237        tokio::spawn(async move {
2238            let mut member_management = member_management;
2239            let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
2240
2241            loop {
2242                let event: Option<Sender<Result<()>>> = if enable_period_tick {
2243                    tokio::select! {
2244                        _ = ticker.tick() => None,
2245                        result_sender = force_refresh_receiver.recv() => {
2246                            if result_sender.is_none() {
2247                                break;
2248                            }
2249
2250                            result_sender
2251                        },
2252                    }
2253                } else {
2254                    let result_sender = force_refresh_receiver.recv().await;
2255
2256                    if result_sender.is_none() {
2257                        break;
2258                    }
2259
2260                    result_sender
2261                };
2262
2263                let tick_result = member_management.refresh_members().await;
2264                if let Err(e) = tick_result.as_ref() {
2265                    tracing::warn!(error = %e.as_report(),  "refresh meta member client failed");
2266                }
2267
2268                if let Some(sender) = event {
2269                    // ignore resp
2270                    let _resp = sender.send(tick_result);
2271                }
2272            }
2273        });
2274
2275        Ok(())
2276    }
2277
2278    async fn force_refresh_leader(&self) -> Result<()> {
2279        let (sender, receiver) = oneshot::channel();
2280
2281        self.member_monitor_event_sender
2282            .send(sender)
2283            .await
2284            .map_err(|e| anyhow!(e))?;
2285
2286        receiver.await.map_err(|e| anyhow!(e))?
2287    }
2288
2289    /// Connect to the meta server from `addrs`.
2290    pub async fn new(strategy: &MetaAddressStrategy, config: Arc<MetaConfig>) -> Result<Self> {
2291        let (channel, addr) = match strategy {
2292            MetaAddressStrategy::LoadBalance(addr) => {
2293                Self::try_build_rpc_channel(vec![addr.clone()]).await
2294            }
2295            MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
2296        }?;
2297        let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
2298        let client = GrpcMetaClient {
2299            member_monitor_event_sender: force_refresh_sender,
2300            core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
2301        };
2302
2303        let meta_member_client = client.core.read().await.meta_member_client.clone();
2304        let members = match strategy {
2305            MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
2306            MetaAddressStrategy::List(addrs) => {
2307                let mut members = LruCache::new(20.try_into().unwrap());
2308                for addr in addrs {
2309                    members.put(addr.clone(), None);
2310                }
2311                members.put(addr.clone(), Some(meta_member_client));
2312
2313                Either::Right(MetaMemberGroup { members })
2314            }
2315        };
2316
2317        client.start_meta_member_monitor(addr, members, force_refresh_receiver, config.clone())?;
2318
2319        client.force_refresh_leader().await?;
2320
2321        Ok(client)
2322    }
2323
2324    fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2325        Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2326    }
2327
2328    pub(crate) async fn try_build_rpc_channel(
2329        addrs: impl IntoIterator<Item = http::Uri>,
2330    ) -> Result<(Channel, http::Uri)> {
2331        let endpoints: Vec<_> = addrs
2332            .into_iter()
2333            .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2334            .collect();
2335
2336        let mut last_error = None;
2337
2338        for (endpoint, addr) in endpoints {
2339            match Self::connect_to_endpoint(endpoint).await {
2340                Ok(channel) => {
2341                    tracing::info!("Connect to meta server {} successfully", addr);
2342                    return Ok((channel, addr));
2343                }
2344                Err(e) => {
2345                    tracing::warn!(
2346                        error = %e.as_report(),
2347                        "Failed to connect to meta server {}, trying again",
2348                        addr,
2349                    );
2350                    last_error = Some(e);
2351                }
2352            }
2353        }
2354
2355        if let Some(last_error) = last_error {
2356            Err(anyhow::anyhow!(last_error)
2357                .context("failed to connect to all meta servers")
2358                .into())
2359        } else {
2360            bail!("no meta server address provided")
2361        }
2362    }
2363
2364    async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2365        let channel = endpoint
2366            .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2367            .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2368            .connect_timeout(Duration::from_secs(5))
2369            .monitored_connect("grpc-meta-client", Default::default())
2370            .await?
2371            .wrapped();
2372
2373        Ok(channel)
2374    }
2375
2376    pub(crate) fn retry_strategy_to_bound(
2377        high_bound: Duration,
2378        exceed: bool,
2379    ) -> impl Iterator<Item = Duration> {
2380        let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2381            .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2382            .map(jitter);
2383
2384        let mut sum = Duration::default();
2385
2386        iter.take_while(move |duration| {
2387            sum += *duration;
2388
2389            if exceed {
2390                sum < high_bound + *duration
2391            } else {
2392                sum < high_bound
2393            }
2394        })
2395    }
2396}
2397
2398macro_rules! for_all_meta_rpc {
2399    ($macro:ident) => {
2400        $macro! {
2401             { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2402            ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2403            ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2404            ,{ cluster_client, update_worker_node_schedulability, UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse }
2405            ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2406            ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2407            ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2408            ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2409            ,{ stream_client, flush, FlushRequest, FlushResponse }
2410            ,{ stream_client, pause, PauseRequest, PauseResponse }
2411            ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2412            ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2413            ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2414            ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2415            ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2416            ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2417            ,{ stream_client, list_creating_fragment_distribution, ListCreatingFragmentDistributionRequest, ListCreatingFragmentDistributionResponse }
2418            ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2419            ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2420            ,{ stream_client, list_object_dependencies, ListObjectDependenciesRequest, ListObjectDependenciesResponse }
2421            ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2422            ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2423            ,{ stream_client, list_cdc_progress, ListCdcProgressRequest, ListCdcProgressResponse }
2424            ,{ stream_client, alter_connector_props, AlterConnectorPropsRequest, AlterConnectorPropsResponse }
2425            ,{ stream_client, get_fragment_by_id, GetFragmentByIdRequest, GetFragmentByIdResponse }
2426            ,{ stream_client, set_sync_log_store_aligned, SetSyncLogStoreAlignedRequest, SetSyncLogStoreAlignedResponse }
2427            ,{ stream_client, refresh, RefreshRequest, RefreshResponse }
2428            ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2429            ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2430            ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2431            ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2432            ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2433            ,{ ddl_client, alter_cdc_table_backfill_parallelism, AlterCdcTableBackfillParallelismRequest, AlterCdcTableBackfillParallelismResponse }
2434            ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2435            ,{ ddl_client, alter_database_param, AlterDatabaseParamRequest, AlterDatabaseParamResponse }
2436            ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2437            ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2438            ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2439            ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2440            ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2441            ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2442            ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2443            ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2444            ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2445            ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2446            ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2447            ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2448            ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2449            ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2450            ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2451            ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2452            ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2453            ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2454            ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2455            ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2456            ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2457            ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2458            ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2459            ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2460            ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2461            ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2462            ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2463            ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2464            ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2465            ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2466            ,{ ddl_client, wait, WaitRequest, WaitResponse }
2467            ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2468            ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2469            ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2470            ,{ ddl_client, compact_iceberg_table, CompactIcebergTableRequest, CompactIcebergTableResponse }
2471            ,{ ddl_client, expire_iceberg_table_snapshots, ExpireIcebergTableSnapshotsRequest, ExpireIcebergTableSnapshotsResponse }
2472            ,{ ddl_client, create_iceberg_table, CreateIcebergTableRequest, CreateIcebergTableResponse }
2473            ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2474            ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2475            ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2476            ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2477            ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2478            ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2479            ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2480            ,{ hummock_client, get_new_object_ids, GetNewObjectIdsRequest, GetNewObjectIdsResponse }
2481            ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2482            ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2483            ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2484            ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2485            ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2486            ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2487            ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2488            ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2489            ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2490            ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2491            ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2492            ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2493            ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2494            ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2495            ,{ hummock_client, subscribe_iceberg_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeIcebergCompactionEventRequest>, Streaming<SubscribeIcebergCompactionEventResponse> }
2496            ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2497            ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2498            ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2499            ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2500            ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2501            ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2502            ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2503            ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2504            ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2505            ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2506            ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2507            ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2508            ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2509            ,{ user_client, alter_default_privilege, AlterDefaultPrivilegeRequest, AlterDefaultPrivilegeResponse }
2510            ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2511            ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2512            ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2513            ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2514            ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2515            ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2516            ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2517            ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2518            ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2519            ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2520            ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2521            ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2522            ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2523            ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2524            ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2525            ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2526            ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2527            ,{ hosted_iceberg_catalog_service_client, list_iceberg_tables, ListIcebergTablesRequest, ListIcebergTablesResponse }
2528            ,{ monitor_client, stack_trace, StackTraceRequest, StackTraceResponse }
2529        }
2530    };
2531}
2532
2533impl GrpcMetaClient {
2534    async fn refresh_client_if_needed(&self, code: Code) {
2535        if matches!(
2536            code,
2537            Code::Unknown | Code::Unimplemented | Code::Unavailable
2538        ) {
2539            tracing::debug!("matching tonic code {}", code);
2540            let (result_sender, result_receiver) = oneshot::channel();
2541            if self
2542                .member_monitor_event_sender
2543                .try_send(result_sender)
2544                .is_ok()
2545            {
2546                if let Ok(Err(e)) = result_receiver.await {
2547                    tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2548                }
2549            } else {
2550                tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2551            }
2552        }
2553    }
2554}
2555
2556impl GrpcMetaClient {
2557    for_all_meta_rpc! { meta_rpc_client_method_impl }
2558}