risingwave_rpc_client/
meta_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{
33    AlterDatabaseParam, FunctionId, IndexId, ObjectId, SecretId, TableId,
34};
35use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
36use risingwave_common::hash::WorkerSlotMapping;
37use risingwave_common::id::{
38    ConnectionId, DatabaseId, JobId, SchemaId, SinkId, SubscriptionId, ViewId, WorkerId,
39};
40use risingwave_common::monitor::EndpointExt;
41use risingwave_common::system_param::reader::SystemParamsReader;
42use risingwave_common::telemetry::report::TelemetryInfoFetcher;
43use risingwave_common::util::addr::HostAddr;
44use risingwave_common::util::meta_addr::MetaAddressStrategy;
45use risingwave_common::util::resource_util::cpu::total_cpu_available;
46use risingwave_common::util::resource_util::hostname;
47use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
48use risingwave_error::bail;
49use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
50use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
51use risingwave_hummock_sdk::{
52    CompactionGroupId, HummockEpoch, HummockVersionId, ObjectIdRange, SyncResult,
53};
54use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
55use risingwave_pb::backup_service::*;
56use risingwave_pb::catalog::{
57    Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
58    PbSubscription, PbTable, PbView, Table,
59};
60use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
61use risingwave_pb::cloud_service::*;
62use risingwave_pb::common::worker_node::Property;
63use risingwave_pb::common::{HostAddress, OptionalUint32, OptionalUint64, WorkerNode, WorkerType};
64use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
65use risingwave_pb::ddl_service::alter_owner_request::Object;
66use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
67use risingwave_pb::ddl_service::create_materialized_view_request::PbBackfillType;
68use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
69use risingwave_pb::ddl_service::*;
70use risingwave_pb::hummock::compact_task::TaskStatus;
71use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
72use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
73use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
74use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
75use risingwave_pb::hummock::write_limits::WriteLimit;
76use risingwave_pb::hummock::*;
77use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Register as IcebergRegister;
78use risingwave_pb::iceberg_compaction::{
79    SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
80    subscribe_iceberg_compaction_event_request,
81};
82use risingwave_pb::id::{ActorId, FragmentId, SourceId};
83use risingwave_pb::meta::alter_connector_props_request::{
84    AlterConnectorPropsObject, AlterIcebergTableIds, ExtraOptions,
85};
86use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
87use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
88use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
89use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
90use risingwave_pb::meta::hosted_iceberg_catalog_service_client::HostedIcebergCatalogServiceClient;
91use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
92use risingwave_pb::meta::list_actor_states_response::ActorState;
93use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
94use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
95use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
96use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
97use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
98use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
99use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
100use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
101use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
102use risingwave_pb::meta::serving_service_client::ServingServiceClient;
103use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
104use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
105use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
106use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
107use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
108use risingwave_pb::meta::{FragmentDistribution, *};
109use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
110use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
111use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
112use risingwave_pb::secret::PbSecretRef;
113use risingwave_pb::stream_plan::StreamFragmentGraph;
114use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
115use risingwave_pb::user::update_user_request::UpdateField;
116use risingwave_pb::user::user_service_client::UserServiceClient;
117use risingwave_pb::user::*;
118use thiserror_ext::AsReport;
119use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
120use tokio::sync::oneshot::Sender;
121use tokio::sync::{RwLock, mpsc, oneshot};
122use tokio::task::JoinHandle;
123use tokio::time::{self};
124use tokio_retry::strategy::{ExponentialBackoff, jitter};
125use tokio_stream::wrappers::UnboundedReceiverStream;
126use tonic::transport::Endpoint;
127use tonic::{Code, Request, Streaming};
128
129use crate::channel::{Channel, WrappedChannelExt};
130use crate::error::{Result, RpcError};
131use crate::hummock_meta_client::{
132    CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
133    IcebergCompactionEventItem,
134};
135use crate::meta_rpc_client_method_impl;
136
137/// Client to meta server. Cloning the instance is lightweight.
138#[derive(Clone, Debug)]
139pub struct MetaClient {
140    worker_id: WorkerId,
141    worker_type: WorkerType,
142    host_addr: HostAddr,
143    inner: GrpcMetaClient,
144    meta_config: Arc<MetaConfig>,
145    cluster_id: String,
146    shutting_down: Arc<AtomicBool>,
147}
148
149impl MetaClient {
150    pub fn worker_id(&self) -> WorkerId {
151        self.worker_id
152    }
153
154    pub fn host_addr(&self) -> &HostAddr {
155        &self.host_addr
156    }
157
158    pub fn worker_type(&self) -> WorkerType {
159        self.worker_type
160    }
161
162    pub fn cluster_id(&self) -> &str {
163        &self.cluster_id
164    }
165
166    /// Subscribe to notification from meta.
167    pub async fn subscribe(
168        &self,
169        subscribe_type: SubscribeType,
170    ) -> Result<Streaming<SubscribeResponse>> {
171        let request = SubscribeRequest {
172            subscribe_type: subscribe_type as i32,
173            host: Some(self.host_addr.to_protobuf()),
174            worker_id: self.worker_id(),
175        };
176
177        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
178            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
179            true,
180        );
181
182        tokio_retry::Retry::spawn(retry_strategy, || async {
183            let request = request.clone();
184            self.inner.subscribe(request).await
185        })
186        .await
187    }
188
189    pub async fn create_connection(
190        &self,
191        connection_name: String,
192        database_id: DatabaseId,
193        schema_id: SchemaId,
194        owner_id: u32,
195        req: create_connection_request::Payload,
196    ) -> Result<WaitVersion> {
197        let request = CreateConnectionRequest {
198            name: connection_name,
199            database_id,
200            schema_id,
201            owner_id,
202            payload: Some(req),
203        };
204        let resp = self.inner.create_connection(request).await?;
205        Ok(resp
206            .version
207            .ok_or_else(|| anyhow!("wait version not set"))?)
208    }
209
210    pub async fn create_secret(
211        &self,
212        secret_name: String,
213        database_id: DatabaseId,
214        schema_id: SchemaId,
215        owner_id: u32,
216        value: Vec<u8>,
217    ) -> Result<WaitVersion> {
218        let request = CreateSecretRequest {
219            name: secret_name,
220            database_id,
221            schema_id,
222            owner_id,
223            value,
224        };
225        let resp = self.inner.create_secret(request).await?;
226        Ok(resp
227            .version
228            .ok_or_else(|| anyhow!("wait version not set"))?)
229    }
230
231    pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
232        let request = ListConnectionsRequest {};
233        let resp = self.inner.list_connections(request).await?;
234        Ok(resp.connections)
235    }
236
237    pub async fn drop_connection(
238        &self,
239        connection_id: ConnectionId,
240        cascade: bool,
241    ) -> Result<WaitVersion> {
242        let request = DropConnectionRequest {
243            connection_id,
244            cascade,
245        };
246        let resp = self.inner.drop_connection(request).await?;
247        Ok(resp
248            .version
249            .ok_or_else(|| anyhow!("wait version not set"))?)
250    }
251
252    pub async fn drop_secret(&self, secret_id: SecretId) -> Result<WaitVersion> {
253        let request = DropSecretRequest { secret_id };
254        let resp = self.inner.drop_secret(request).await?;
255        Ok(resp
256            .version
257            .ok_or_else(|| anyhow!("wait version not set"))?)
258    }
259
260    /// Register the current node to the cluster and set the corresponding worker id.
261    ///
262    /// Retry if there's connection issue with the meta node. Exit the process if the registration fails.
263    pub async fn register_new(
264        addr_strategy: MetaAddressStrategy,
265        worker_type: WorkerType,
266        addr: &HostAddr,
267        property: Property,
268        meta_config: Arc<MetaConfig>,
269    ) -> (Self, SystemParamsReader) {
270        let ret =
271            Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
272
273        match ret {
274            Ok(ret) => ret,
275            Err(err) => {
276                tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
277                std::process::exit(1);
278            }
279        }
280    }
281
282    async fn register_new_inner(
283        addr_strategy: MetaAddressStrategy,
284        worker_type: WorkerType,
285        addr: &HostAddr,
286        property: Property,
287        meta_config: Arc<MetaConfig>,
288    ) -> Result<(Self, SystemParamsReader)> {
289        tracing::info!("register meta client using strategy: {}", addr_strategy);
290
291        // Retry until reaching `max_heartbeat_interval_secs`
292        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
293            Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
294            true,
295        );
296
297        if property.is_unschedulable {
298            tracing::warn!("worker {:?} registered as unschedulable", addr.clone());
299        }
300        let init_result: Result<_> = tokio_retry::RetryIf::spawn(
301            retry_strategy,
302            || async {
303                let grpc_meta_client =
304                    GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
305
306                let add_worker_resp = grpc_meta_client
307                    .add_worker_node(AddWorkerNodeRequest {
308                        worker_type: worker_type as i32,
309                        host: Some(addr.to_protobuf()),
310                        property: Some(property.clone()),
311                        resource: Some(risingwave_pb::common::worker_node::Resource {
312                            rw_version: RW_VERSION.to_owned(),
313                            total_memory_bytes: system_memory_available_bytes() as _,
314                            total_cpu_cores: total_cpu_available() as _,
315                            hostname: hostname(),
316                        }),
317                    })
318                    .await
319                    .context("failed to add worker node")?;
320
321                let system_params_resp = grpc_meta_client
322                    .get_system_params(GetSystemParamsRequest {})
323                    .await
324                    .context("failed to get initial system params")?;
325
326                Ok((add_worker_resp, system_params_resp, grpc_meta_client))
327            },
328            // Only retry if there's any transient connection issue.
329            // If the error is from our implementation or business, do not retry it.
330            |e: &RpcError| !e.is_from_tonic_server_impl(),
331        )
332        .await;
333
334        let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
335        let worker_id = add_worker_resp
336            .node_id
337            .expect("AddWorkerNodeResponse::node_id is empty");
338
339        let meta_client = Self {
340            worker_id,
341            worker_type,
342            host_addr: addr.clone(),
343            inner: grpc_meta_client,
344            meta_config: meta_config.clone(),
345            cluster_id: add_worker_resp.cluster_id,
346            shutting_down: Arc::new(false.into()),
347        };
348
349        static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
350        REPORT_PANIC.call_once(|| {
351            let meta_client_clone = meta_client.clone();
352            std::panic::update_hook(move |default_hook, info| {
353                // Try to report panic event to meta node.
354                meta_client_clone.try_add_panic_event_blocking(info, None);
355                default_hook(info);
356            });
357        });
358
359        Ok((meta_client, system_params_resp.params.unwrap().into()))
360    }
361
362    /// Activate the current node in cluster to confirm it's ready to serve.
363    pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
364        let request = ActivateWorkerNodeRequest {
365            host: Some(addr.to_protobuf()),
366            node_id: self.worker_id,
367        };
368        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
369            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
370            true,
371        );
372        tokio_retry::Retry::spawn(retry_strategy, || async {
373            let request = request.clone();
374            self.inner.activate_worker_node(request).await
375        })
376        .await?;
377
378        Ok(())
379    }
380
381    /// Send heartbeat signal to meta service.
382    pub async fn send_heartbeat(&self, node_id: WorkerId) -> Result<()> {
383        let request = HeartbeatRequest { node_id };
384        let resp = self.inner.heartbeat(request).await?;
385        if let Some(status) = resp.status
386            && status.code() == risingwave_pb::common::status::Code::UnknownWorker
387        {
388            // Ignore the error if we're already shutting down.
389            // Otherwise, exit the process.
390            if !self.shutting_down.load(Relaxed) {
391                tracing::error!(message = status.message, "worker expired");
392                std::process::exit(1);
393            }
394        }
395        Ok(())
396    }
397
398    pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
399        let request = CreateDatabaseRequest { db: Some(db) };
400        let resp = self.inner.create_database(request).await?;
401        // TODO: handle error in `resp.status` here
402        Ok(resp
403            .version
404            .ok_or_else(|| anyhow!("wait version not set"))?)
405    }
406
407    pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
408        let request = CreateSchemaRequest {
409            schema: Some(schema),
410        };
411        let resp = self.inner.create_schema(request).await?;
412        // TODO: handle error in `resp.status` here
413        Ok(resp
414            .version
415            .ok_or_else(|| anyhow!("wait version not set"))?)
416    }
417
418    pub async fn create_materialized_view(
419        &self,
420        table: PbTable,
421        graph: StreamFragmentGraph,
422        dependencies: HashSet<ObjectId>,
423        specific_resource_group: Option<String>,
424        if_not_exists: bool,
425    ) -> Result<WaitVersion> {
426        let request = CreateMaterializedViewRequest {
427            materialized_view: Some(table),
428            fragment_graph: Some(graph),
429            backfill: PbBackfillType::Regular as _,
430            dependencies: dependencies.into_iter().collect(),
431            specific_resource_group,
432            if_not_exists,
433        };
434        let resp = self.inner.create_materialized_view(request).await?;
435        // TODO: handle error in `resp.status` here
436        Ok(resp
437            .version
438            .ok_or_else(|| anyhow!("wait version not set"))?)
439    }
440
441    pub async fn drop_materialized_view(
442        &self,
443        table_id: TableId,
444        cascade: bool,
445    ) -> Result<WaitVersion> {
446        let request = DropMaterializedViewRequest { table_id, cascade };
447
448        let resp = self.inner.drop_materialized_view(request).await?;
449        Ok(resp
450            .version
451            .ok_or_else(|| anyhow!("wait version not set"))?)
452    }
453
454    pub async fn create_source(
455        &self,
456        source: PbSource,
457        graph: Option<StreamFragmentGraph>,
458        if_not_exists: bool,
459    ) -> Result<WaitVersion> {
460        let request = CreateSourceRequest {
461            source: Some(source),
462            fragment_graph: graph,
463            if_not_exists,
464        };
465
466        let resp = self.inner.create_source(request).await?;
467        Ok(resp
468            .version
469            .ok_or_else(|| anyhow!("wait version not set"))?)
470    }
471
472    pub async fn create_sink(
473        &self,
474        sink: PbSink,
475        graph: StreamFragmentGraph,
476        dependencies: HashSet<ObjectId>,
477        if_not_exists: bool,
478    ) -> Result<WaitVersion> {
479        let request = CreateSinkRequest {
480            sink: Some(sink),
481            fragment_graph: Some(graph),
482            dependencies: dependencies.into_iter().collect(),
483            if_not_exists,
484        };
485
486        let resp = self.inner.create_sink(request).await?;
487        Ok(resp
488            .version
489            .ok_or_else(|| anyhow!("wait version not set"))?)
490    }
491
492    pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
493        let request = CreateSubscriptionRequest {
494            subscription: Some(subscription),
495        };
496
497        let resp = self.inner.create_subscription(request).await?;
498        Ok(resp
499            .version
500            .ok_or_else(|| anyhow!("wait version not set"))?)
501    }
502
503    pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
504        let request = CreateFunctionRequest {
505            function: Some(function),
506        };
507        let resp = self.inner.create_function(request).await?;
508        Ok(resp
509            .version
510            .ok_or_else(|| anyhow!("wait version not set"))?)
511    }
512
513    pub async fn create_table(
514        &self,
515        source: Option<PbSource>,
516        table: PbTable,
517        graph: StreamFragmentGraph,
518        job_type: PbTableJobType,
519        if_not_exists: bool,
520        dependencies: HashSet<ObjectId>,
521    ) -> Result<WaitVersion> {
522        let request = CreateTableRequest {
523            materialized_view: Some(table),
524            fragment_graph: Some(graph),
525            source,
526            job_type: job_type as _,
527            if_not_exists,
528            dependencies: dependencies.into_iter().collect(),
529        };
530        let resp = self.inner.create_table(request).await?;
531        // TODO: handle error in `resp.status` here
532        Ok(resp
533            .version
534            .ok_or_else(|| anyhow!("wait version not set"))?)
535    }
536
537    pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
538        let request = CommentOnRequest {
539            comment: Some(comment),
540        };
541        let resp = self.inner.comment_on(request).await?;
542        Ok(resp
543            .version
544            .ok_or_else(|| anyhow!("wait version not set"))?)
545    }
546
547    pub async fn alter_name(
548        &self,
549        object: alter_name_request::Object,
550        name: &str,
551    ) -> Result<WaitVersion> {
552        let request = AlterNameRequest {
553            object: Some(object),
554            new_name: name.to_owned(),
555        };
556        let resp = self.inner.alter_name(request).await?;
557        Ok(resp
558            .version
559            .ok_or_else(|| anyhow!("wait version not set"))?)
560    }
561
562    pub async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<WaitVersion> {
563        let request = AlterOwnerRequest {
564            object: Some(object),
565            owner_id,
566        };
567        let resp = self.inner.alter_owner(request).await?;
568        Ok(resp
569            .version
570            .ok_or_else(|| anyhow!("wait version not set"))?)
571    }
572
573    pub async fn alter_database_param(
574        &self,
575        database_id: DatabaseId,
576        param: AlterDatabaseParam,
577    ) -> Result<WaitVersion> {
578        let request = match param {
579            AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms) => {
580                let barrier_interval_ms = OptionalUint32 {
581                    value: barrier_interval_ms,
582                };
583                AlterDatabaseParamRequest {
584                    database_id,
585                    param: Some(alter_database_param_request::Param::BarrierIntervalMs(
586                        barrier_interval_ms,
587                    )),
588                }
589            }
590            AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency) => {
591                let checkpoint_frequency = OptionalUint64 {
592                    value: checkpoint_frequency,
593                };
594                AlterDatabaseParamRequest {
595                    database_id,
596                    param: Some(alter_database_param_request::Param::CheckpointFrequency(
597                        checkpoint_frequency,
598                    )),
599                }
600            }
601        };
602        let resp = self.inner.alter_database_param(request).await?;
603        Ok(resp
604            .version
605            .ok_or_else(|| anyhow!("wait version not set"))?)
606    }
607
608    pub async fn alter_set_schema(
609        &self,
610        object: alter_set_schema_request::Object,
611        new_schema_id: SchemaId,
612    ) -> Result<WaitVersion> {
613        let request = AlterSetSchemaRequest {
614            new_schema_id,
615            object: Some(object),
616        };
617        let resp = self.inner.alter_set_schema(request).await?;
618        Ok(resp
619            .version
620            .ok_or_else(|| anyhow!("wait version not set"))?)
621    }
622
623    pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
624        let request = AlterSourceRequest {
625            source: Some(source),
626        };
627        let resp = self.inner.alter_source(request).await?;
628        Ok(resp
629            .version
630            .ok_or_else(|| anyhow!("wait version not set"))?)
631    }
632
633    pub async fn alter_parallelism(
634        &self,
635        job_id: JobId,
636        parallelism: PbTableParallelism,
637        deferred: bool,
638    ) -> Result<()> {
639        let request = AlterParallelismRequest {
640            table_id: job_id,
641            parallelism: Some(parallelism),
642            deferred,
643        };
644
645        self.inner.alter_parallelism(request).await?;
646        Ok(())
647    }
648
649    pub async fn alter_streaming_job_config(
650        &self,
651        job_id: JobId,
652        entries_to_add: HashMap<String, String>,
653        keys_to_remove: Vec<String>,
654    ) -> Result<()> {
655        let request = AlterStreamingJobConfigRequest {
656            job_id,
657            entries_to_add,
658            keys_to_remove,
659        };
660
661        self.inner.alter_streaming_job_config(request).await?;
662        Ok(())
663    }
664
665    pub async fn alter_fragment_parallelism(
666        &self,
667        fragment_ids: Vec<FragmentId>,
668        parallelism: Option<PbTableParallelism>,
669    ) -> Result<()> {
670        let request = AlterFragmentParallelismRequest {
671            fragment_ids,
672            parallelism,
673        };
674
675        self.inner.alter_fragment_parallelism(request).await?;
676        Ok(())
677    }
678
679    pub async fn alter_cdc_table_backfill_parallelism(
680        &self,
681        table_id: JobId,
682        parallelism: PbTableParallelism,
683    ) -> Result<()> {
684        let request = AlterCdcTableBackfillParallelismRequest {
685            table_id,
686            parallelism: Some(parallelism),
687        };
688        self.inner
689            .alter_cdc_table_backfill_parallelism(request)
690            .await?;
691        Ok(())
692    }
693
694    pub async fn alter_resource_group(
695        &self,
696        table_id: TableId,
697        resource_group: Option<String>,
698        deferred: bool,
699    ) -> Result<()> {
700        let request = AlterResourceGroupRequest {
701            table_id,
702            resource_group,
703            deferred,
704        };
705
706        self.inner.alter_resource_group(request).await?;
707        Ok(())
708    }
709
710    pub async fn alter_swap_rename(
711        &self,
712        object: alter_swap_rename_request::Object,
713    ) -> Result<WaitVersion> {
714        let request = AlterSwapRenameRequest {
715            object: Some(object),
716        };
717        let resp = self.inner.alter_swap_rename(request).await?;
718        Ok(resp
719            .version
720            .ok_or_else(|| anyhow!("wait version not set"))?)
721    }
722
723    pub async fn alter_secret(
724        &self,
725        secret_id: SecretId,
726        secret_name: String,
727        database_id: DatabaseId,
728        schema_id: SchemaId,
729        owner_id: u32,
730        value: Vec<u8>,
731    ) -> Result<WaitVersion> {
732        let request = AlterSecretRequest {
733            secret_id,
734            name: secret_name,
735            database_id,
736            schema_id,
737            owner_id,
738            value,
739        };
740        let resp = self.inner.alter_secret(request).await?;
741        Ok(resp
742            .version
743            .ok_or_else(|| anyhow!("wait version not set"))?)
744    }
745
746    pub async fn replace_job(
747        &self,
748        graph: StreamFragmentGraph,
749        replace_job: ReplaceJob,
750    ) -> Result<WaitVersion> {
751        let request = ReplaceJobPlanRequest {
752            plan: Some(ReplaceJobPlan {
753                fragment_graph: Some(graph),
754                replace_job: Some(replace_job),
755            }),
756        };
757        let resp = self.inner.replace_job_plan(request).await?;
758        // TODO: handle error in `resp.status` here
759        Ok(resp
760            .version
761            .ok_or_else(|| anyhow!("wait version not set"))?)
762    }
763
764    pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
765        let request = AutoSchemaChangeRequest {
766            schema_change: Some(schema_change),
767        };
768        let _ = self.inner.auto_schema_change(request).await?;
769        Ok(())
770    }
771
772    pub async fn create_view(
773        &self,
774        view: PbView,
775        dependencies: HashSet<ObjectId>,
776    ) -> Result<WaitVersion> {
777        let request = CreateViewRequest {
778            view: Some(view),
779            dependencies: dependencies.into_iter().collect(),
780        };
781        let resp = self.inner.create_view(request).await?;
782        // TODO: handle error in `resp.status` here
783        Ok(resp
784            .version
785            .ok_or_else(|| anyhow!("wait version not set"))?)
786    }
787
788    pub async fn create_index(
789        &self,
790        index: PbIndex,
791        table: PbTable,
792        graph: StreamFragmentGraph,
793        if_not_exists: bool,
794    ) -> Result<WaitVersion> {
795        let request = CreateIndexRequest {
796            index: Some(index),
797            index_table: Some(table),
798            fragment_graph: Some(graph),
799            if_not_exists,
800        };
801        let resp = self.inner.create_index(request).await?;
802        // TODO: handle error in `resp.status` here
803        Ok(resp
804            .version
805            .ok_or_else(|| anyhow!("wait version not set"))?)
806    }
807
808    pub async fn drop_table(
809        &self,
810        source_id: Option<u32>,
811        table_id: TableId,
812        cascade: bool,
813    ) -> Result<WaitVersion> {
814        let request = DropTableRequest {
815            source_id: source_id.map(risingwave_pb::ddl_service::drop_table_request::SourceId::Id),
816            table_id,
817            cascade,
818        };
819
820        let resp = self.inner.drop_table(request).await?;
821        Ok(resp
822            .version
823            .ok_or_else(|| anyhow!("wait version not set"))?)
824    }
825
826    pub async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
827        let request = CompactIcebergTableRequest { sink_id };
828        let resp = self.inner.compact_iceberg_table(request).await?;
829        Ok(resp.task_id)
830    }
831
832    pub async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
833        let request = ExpireIcebergTableSnapshotsRequest { sink_id };
834        let _resp = self.inner.expire_iceberg_table_snapshots(request).await?;
835        Ok(())
836    }
837
838    pub async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<WaitVersion> {
839        let request = DropViewRequest { view_id, cascade };
840        let resp = self.inner.drop_view(request).await?;
841        Ok(resp
842            .version
843            .ok_or_else(|| anyhow!("wait version not set"))?)
844    }
845
846    pub async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<WaitVersion> {
847        let request = DropSourceRequest { source_id, cascade };
848        let resp = self.inner.drop_source(request).await?;
849        Ok(resp
850            .version
851            .ok_or_else(|| anyhow!("wait version not set"))?)
852    }
853
854    pub async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<WaitVersion> {
855        let request = DropSinkRequest { sink_id, cascade };
856        let resp = self.inner.drop_sink(request).await?;
857        Ok(resp
858            .version
859            .ok_or_else(|| anyhow!("wait version not set"))?)
860    }
861
862    pub async fn drop_subscription(
863        &self,
864        subscription_id: SubscriptionId,
865        cascade: bool,
866    ) -> Result<WaitVersion> {
867        let request = DropSubscriptionRequest {
868            subscription_id,
869            cascade,
870        };
871        let resp = self.inner.drop_subscription(request).await?;
872        Ok(resp
873            .version
874            .ok_or_else(|| anyhow!("wait version not set"))?)
875    }
876
877    pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
878        let request = DropIndexRequest { index_id, cascade };
879        let resp = self.inner.drop_index(request).await?;
880        Ok(resp
881            .version
882            .ok_or_else(|| anyhow!("wait version not set"))?)
883    }
884
885    pub async fn drop_function(
886        &self,
887        function_id: FunctionId,
888        cascade: bool,
889    ) -> Result<WaitVersion> {
890        let request = DropFunctionRequest {
891            function_id,
892            cascade,
893        };
894        let resp = self.inner.drop_function(request).await?;
895        Ok(resp
896            .version
897            .ok_or_else(|| anyhow!("wait version not set"))?)
898    }
899
900    pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
901        let request = DropDatabaseRequest { database_id };
902        let resp = self.inner.drop_database(request).await?;
903        Ok(resp
904            .version
905            .ok_or_else(|| anyhow!("wait version not set"))?)
906    }
907
908    pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
909        let request = DropSchemaRequest { schema_id, cascade };
910        let resp = self.inner.drop_schema(request).await?;
911        Ok(resp
912            .version
913            .ok_or_else(|| anyhow!("wait version not set"))?)
914    }
915
916    // TODO: using UserInfoVersion instead as return type.
917    pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
918        let request = CreateUserRequest { user: Some(user) };
919        let resp = self.inner.create_user(request).await?;
920        Ok(resp.version)
921    }
922
923    pub async fn drop_user(&self, user_id: u32) -> Result<u64> {
924        let request = DropUserRequest { user_id };
925        let resp = self.inner.drop_user(request).await?;
926        Ok(resp.version)
927    }
928
929    pub async fn update_user(
930        &self,
931        user: UserInfo,
932        update_fields: Vec<UpdateField>,
933    ) -> Result<u64> {
934        let request = UpdateUserRequest {
935            user: Some(user),
936            update_fields: update_fields
937                .into_iter()
938                .map(|field| field as i32)
939                .collect::<Vec<_>>(),
940        };
941        let resp = self.inner.update_user(request).await?;
942        Ok(resp.version)
943    }
944
945    pub async fn grant_privilege(
946        &self,
947        user_ids: Vec<u32>,
948        privileges: Vec<GrantPrivilege>,
949        with_grant_option: bool,
950        granted_by: u32,
951    ) -> Result<u64> {
952        let request = GrantPrivilegeRequest {
953            user_ids,
954            privileges,
955            with_grant_option,
956            granted_by,
957        };
958        let resp = self.inner.grant_privilege(request).await?;
959        Ok(resp.version)
960    }
961
962    pub async fn revoke_privilege(
963        &self,
964        user_ids: Vec<u32>,
965        privileges: Vec<GrantPrivilege>,
966        granted_by: u32,
967        revoke_by: u32,
968        revoke_grant_option: bool,
969        cascade: bool,
970    ) -> Result<u64> {
971        let request = RevokePrivilegeRequest {
972            user_ids,
973            privileges,
974            granted_by,
975            revoke_by,
976            revoke_grant_option,
977            cascade,
978        };
979        let resp = self.inner.revoke_privilege(request).await?;
980        Ok(resp.version)
981    }
982
983    pub async fn alter_default_privilege(
984        &self,
985        user_ids: Vec<u32>,
986        database_id: DatabaseId,
987        schema_ids: Vec<SchemaId>,
988        operation: AlterDefaultPrivilegeOperation,
989        granted_by: u32,
990    ) -> Result<()> {
991        let request = AlterDefaultPrivilegeRequest {
992            user_ids,
993            database_id,
994            schema_ids,
995            operation: Some(operation),
996            granted_by,
997        };
998        self.inner.alter_default_privilege(request).await?;
999        Ok(())
1000    }
1001
1002    /// Unregister the current node from the cluster.
1003    pub async fn unregister(&self) -> Result<()> {
1004        let request = DeleteWorkerNodeRequest {
1005            host: Some(self.host_addr.to_protobuf()),
1006        };
1007        self.inner.delete_worker_node(request).await?;
1008        self.shutting_down.store(true, Relaxed);
1009        Ok(())
1010    }
1011
1012    /// Try to unregister the current worker from the cluster with best effort. Log the result.
1013    pub async fn try_unregister(&self) {
1014        match self.unregister().await {
1015            Ok(_) => {
1016                tracing::info!(
1017                    worker_id = %self.worker_id(),
1018                    "successfully unregistered from meta service",
1019                )
1020            }
1021            Err(e) => {
1022                tracing::warn!(
1023                    error = %e.as_report(),
1024                    worker_id = %self.worker_id(),
1025                    "failed to unregister from meta service",
1026                );
1027            }
1028        }
1029    }
1030
1031    pub async fn update_schedulability(
1032        &self,
1033        worker_ids: &[WorkerId],
1034        schedulability: Schedulability,
1035    ) -> Result<UpdateWorkerNodeSchedulabilityResponse> {
1036        let request = UpdateWorkerNodeSchedulabilityRequest {
1037            worker_ids: worker_ids.to_vec(),
1038            schedulability: schedulability.into(),
1039        };
1040        let resp = self
1041            .inner
1042            .update_worker_node_schedulability(request)
1043            .await?;
1044        Ok(resp)
1045    }
1046
1047    pub async fn list_worker_nodes(
1048        &self,
1049        worker_type: Option<WorkerType>,
1050    ) -> Result<Vec<WorkerNode>> {
1051        let request = ListAllNodesRequest {
1052            worker_type: worker_type.map(Into::into),
1053            include_starting_nodes: true,
1054        };
1055        let resp = self.inner.list_all_nodes(request).await?;
1056        Ok(resp.nodes)
1057    }
1058
1059    /// Starts a heartbeat worker.
1060    pub fn start_heartbeat_loop(
1061        meta_client: MetaClient,
1062        min_interval: Duration,
1063    ) -> (JoinHandle<()>, Sender<()>) {
1064        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1065        let join_handle = tokio::spawn(async move {
1066            let mut min_interval_ticker = tokio::time::interval(min_interval);
1067            loop {
1068                tokio::select! {
1069                    biased;
1070                    // Shutdown
1071                    _ = &mut shutdown_rx => {
1072                        tracing::info!("Heartbeat loop is stopped");
1073                        return;
1074                    }
1075                    // Wait for interval
1076                    _ = min_interval_ticker.tick() => {},
1077                }
1078                tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
1079                match tokio::time::timeout(
1080                    // TODO: decide better min_interval for timeout
1081                    min_interval * 3,
1082                    meta_client.send_heartbeat(meta_client.worker_id()),
1083                )
1084                .await
1085                {
1086                    Ok(Ok(_)) => {}
1087                    Ok(Err(err)) => {
1088                        tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
1089                    }
1090                    Err(_) => {
1091                        tracing::warn!("Failed to send_heartbeat: timeout");
1092                    }
1093                }
1094            }
1095        });
1096        (join_handle, shutdown_tx)
1097    }
1098
1099    pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
1100        let request = RisectlListStateTablesRequest {};
1101        let resp = self.inner.risectl_list_state_tables(request).await?;
1102        Ok(resp.tables)
1103    }
1104
1105    pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
1106        let request = FlushRequest { database_id };
1107        let resp = self.inner.flush(request).await?;
1108        Ok(HummockVersionId::new(resp.hummock_version_id))
1109    }
1110
1111    pub async fn wait(&self) -> Result<()> {
1112        let request = WaitRequest {};
1113        self.inner.wait(request).await?;
1114        Ok(())
1115    }
1116
1117    pub async fn recover(&self) -> Result<()> {
1118        let request = RecoverRequest {};
1119        self.inner.recover(request).await?;
1120        Ok(())
1121    }
1122
1123    pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
1124        let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
1125        let resp = self.inner.cancel_creating_jobs(request).await?;
1126        Ok(resp.canceled_jobs)
1127    }
1128
1129    pub async fn list_table_fragments(
1130        &self,
1131        job_ids: &[JobId],
1132    ) -> Result<HashMap<JobId, TableFragmentInfo>> {
1133        let request = ListTableFragmentsRequest {
1134            table_ids: job_ids.to_vec(),
1135        };
1136        let resp = self.inner.list_table_fragments(request).await?;
1137        Ok(resp.table_fragments)
1138    }
1139
1140    pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1141        let resp = self
1142            .inner
1143            .list_streaming_job_states(ListStreamingJobStatesRequest {})
1144            .await?;
1145        Ok(resp.states)
1146    }
1147
1148    pub async fn list_fragment_distributions(&self) -> Result<Vec<FragmentDistribution>> {
1149        let resp = self
1150            .inner
1151            .list_fragment_distribution(ListFragmentDistributionRequest {})
1152            .await?;
1153        Ok(resp.distributions)
1154    }
1155
1156    pub async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
1157        let resp = self
1158            .inner
1159            .list_creating_fragment_distribution(ListCreatingFragmentDistributionRequest {})
1160            .await?;
1161        Ok(resp.distributions)
1162    }
1163
1164    pub async fn get_fragment_by_id(
1165        &self,
1166        fragment_id: FragmentId,
1167    ) -> Result<Option<FragmentDistribution>> {
1168        let resp = self
1169            .inner
1170            .get_fragment_by_id(GetFragmentByIdRequest { fragment_id })
1171            .await?;
1172        Ok(resp.distribution)
1173    }
1174
1175    pub async fn get_fragment_vnodes(
1176        &self,
1177        fragment_id: FragmentId,
1178    ) -> Result<Vec<(ActorId, Vec<u32>)>> {
1179        let resp = self
1180            .inner
1181            .get_fragment_vnodes(GetFragmentVnodesRequest { fragment_id })
1182            .await?;
1183        Ok(resp
1184            .actor_vnodes
1185            .into_iter()
1186            .map(|actor| (actor.actor_id, actor.vnode_indices))
1187            .collect())
1188    }
1189
1190    pub async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>> {
1191        let resp = self
1192            .inner
1193            .get_actor_vnodes(GetActorVnodesRequest { actor_id })
1194            .await?;
1195        Ok(resp.vnode_indices)
1196    }
1197
1198    pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1199        let resp = self
1200            .inner
1201            .list_actor_states(ListActorStatesRequest {})
1202            .await?;
1203        Ok(resp.states)
1204    }
1205
1206    pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1207        let resp = self
1208            .inner
1209            .list_actor_splits(ListActorSplitsRequest {})
1210            .await?;
1211
1212        Ok(resp.actor_splits)
1213    }
1214
1215    pub async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
1216        let resp = self
1217            .inner
1218            .list_object_dependencies(ListObjectDependenciesRequest {})
1219            .await?;
1220        Ok(resp.dependencies)
1221    }
1222
1223    pub async fn pause(&self) -> Result<PauseResponse> {
1224        let request = PauseRequest {};
1225        let resp = self.inner.pause(request).await?;
1226        Ok(resp)
1227    }
1228
1229    pub async fn resume(&self) -> Result<ResumeResponse> {
1230        let request = ResumeRequest {};
1231        let resp = self.inner.resume(request).await?;
1232        Ok(resp)
1233    }
1234
1235    pub async fn apply_throttle(
1236        &self,
1237        kind: PbThrottleTarget,
1238        id: u32,
1239        rate: Option<u32>,
1240    ) -> Result<ApplyThrottleResponse> {
1241        let request = ApplyThrottleRequest {
1242            kind: kind as i32,
1243            id,
1244            rate,
1245        };
1246        let resp = self.inner.apply_throttle(request).await?;
1247        Ok(resp)
1248    }
1249
1250    pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1251        let resp = self
1252            .inner
1253            .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1254            .await?;
1255        Ok(resp.get_status().unwrap())
1256    }
1257
1258    pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1259        let request = GetClusterInfoRequest {};
1260        let resp = self.inner.get_cluster_info(request).await?;
1261        Ok(resp)
1262    }
1263
1264    pub async fn reschedule(
1265        &self,
1266        worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1267        revision: u64,
1268        resolve_no_shuffle_upstream: bool,
1269    ) -> Result<(bool, u64)> {
1270        let request = RescheduleRequest {
1271            revision,
1272            resolve_no_shuffle_upstream,
1273            worker_reschedules,
1274        };
1275        let resp = self.inner.reschedule(request).await?;
1276        Ok((resp.success, resp.revision))
1277    }
1278
1279    pub async fn risectl_get_pinned_versions_summary(
1280        &self,
1281    ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1282        let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1283        self.inner
1284            .rise_ctl_get_pinned_versions_summary(request)
1285            .await
1286    }
1287
1288    pub async fn risectl_get_checkpoint_hummock_version(
1289        &self,
1290    ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1291        let request = RiseCtlGetCheckpointVersionRequest {};
1292        self.inner.rise_ctl_get_checkpoint_version(request).await
1293    }
1294
1295    pub async fn risectl_pause_hummock_version_checkpoint(
1296        &self,
1297    ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1298        let request = RiseCtlPauseVersionCheckpointRequest {};
1299        self.inner.rise_ctl_pause_version_checkpoint(request).await
1300    }
1301
1302    pub async fn risectl_resume_hummock_version_checkpoint(
1303        &self,
1304    ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1305        let request = RiseCtlResumeVersionCheckpointRequest {};
1306        self.inner.rise_ctl_resume_version_checkpoint(request).await
1307    }
1308
1309    pub async fn init_metadata_for_replay(
1310        &self,
1311        tables: Vec<PbTable>,
1312        compaction_groups: Vec<CompactionGroupInfo>,
1313    ) -> Result<()> {
1314        let req = InitMetadataForReplayRequest {
1315            tables,
1316            compaction_groups,
1317        };
1318        let _resp = self.inner.init_metadata_for_replay(req).await?;
1319        Ok(())
1320    }
1321
1322    pub async fn replay_version_delta(
1323        &self,
1324        version_delta: HummockVersionDelta,
1325    ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1326        let req = ReplayVersionDeltaRequest {
1327            version_delta: Some(version_delta.into()),
1328        };
1329        let resp = self.inner.replay_version_delta(req).await?;
1330        Ok((
1331            HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1332            resp.modified_compaction_groups,
1333        ))
1334    }
1335
1336    pub async fn list_version_deltas(
1337        &self,
1338        start_id: HummockVersionId,
1339        num_limit: u32,
1340        committed_epoch_limit: HummockEpoch,
1341    ) -> Result<Vec<HummockVersionDelta>> {
1342        let req = ListVersionDeltasRequest {
1343            start_id: start_id.to_u64(),
1344            num_limit,
1345            committed_epoch_limit,
1346        };
1347        Ok(self
1348            .inner
1349            .list_version_deltas(req)
1350            .await?
1351            .version_deltas
1352            .unwrap()
1353            .version_deltas
1354            .iter()
1355            .map(HummockVersionDelta::from_rpc_protobuf)
1356            .collect())
1357    }
1358
1359    pub async fn trigger_compaction_deterministic(
1360        &self,
1361        version_id: HummockVersionId,
1362        compaction_groups: Vec<CompactionGroupId>,
1363    ) -> Result<()> {
1364        let req = TriggerCompactionDeterministicRequest {
1365            version_id: version_id.to_u64(),
1366            compaction_groups,
1367        };
1368        self.inner.trigger_compaction_deterministic(req).await?;
1369        Ok(())
1370    }
1371
1372    pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1373        let req = DisableCommitEpochRequest {};
1374        Ok(HummockVersion::from_rpc_protobuf(
1375            &self
1376                .inner
1377                .disable_commit_epoch(req)
1378                .await?
1379                .current_version
1380                .unwrap(),
1381        ))
1382    }
1383
1384    pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1385        let req = GetAssignedCompactTaskNumRequest {};
1386        let resp = self.inner.get_assigned_compact_task_num(req).await?;
1387        Ok(resp.num_tasks as usize)
1388    }
1389
1390    pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1391        let req = RiseCtlListCompactionGroupRequest {};
1392        let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1393        Ok(resp.compaction_groups)
1394    }
1395
1396    pub async fn risectl_update_compaction_config(
1397        &self,
1398        compaction_groups: &[CompactionGroupId],
1399        configs: &[MutableConfig],
1400    ) -> Result<()> {
1401        let req = RiseCtlUpdateCompactionConfigRequest {
1402            compaction_group_ids: compaction_groups.to_vec(),
1403            configs: configs
1404                .iter()
1405                .map(
1406                    |c| rise_ctl_update_compaction_config_request::MutableConfig {
1407                        mutable_config: Some(c.clone()),
1408                    },
1409                )
1410                .collect(),
1411        };
1412        let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1413        Ok(())
1414    }
1415
1416    pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1417        let req = BackupMetaRequest { remarks };
1418        let resp = self.inner.backup_meta(req).await?;
1419        Ok(resp.job_id)
1420    }
1421
1422    pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1423        let req = GetBackupJobStatusRequest { job_id };
1424        let resp = self.inner.get_backup_job_status(req).await?;
1425        Ok((resp.job_status(), resp.message))
1426    }
1427
1428    pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1429        let req = DeleteMetaSnapshotRequest {
1430            snapshot_ids: snapshot_ids.to_vec(),
1431        };
1432        let _resp = self.inner.delete_meta_snapshot(req).await?;
1433        Ok(())
1434    }
1435
1436    pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1437        let req = GetMetaSnapshotManifestRequest {};
1438        let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1439        Ok(resp.manifest.expect("should exist"))
1440    }
1441
1442    pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1443        let req = GetTelemetryInfoRequest {};
1444        let resp = self.inner.get_telemetry_info(req).await?;
1445        Ok(resp)
1446    }
1447
1448    pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1449        let req = GetMetaStoreInfoRequest {};
1450        let resp = self.inner.get_meta_store_info(req).await?;
1451        Ok(resp.meta_store_endpoint)
1452    }
1453
1454    pub async fn alter_sink_props(
1455        &self,
1456        sink_id: SinkId,
1457        changed_props: BTreeMap<String, String>,
1458        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1459        connector_conn_ref: Option<ConnectionId>,
1460    ) -> Result<()> {
1461        let req = AlterConnectorPropsRequest {
1462            object_id: sink_id.as_raw_id(),
1463            changed_props: changed_props.into_iter().collect(),
1464            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1465            connector_conn_ref,
1466            object_type: AlterConnectorPropsObject::Sink as i32,
1467            extra_options: None,
1468        };
1469        let _resp = self.inner.alter_connector_props(req).await?;
1470        Ok(())
1471    }
1472
1473    pub async fn alter_iceberg_table_props(
1474        &self,
1475        table_id: TableId,
1476        sink_id: SinkId,
1477        source_id: SourceId,
1478        changed_props: BTreeMap<String, String>,
1479        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1480        connector_conn_ref: Option<ConnectionId>,
1481    ) -> Result<()> {
1482        let req = AlterConnectorPropsRequest {
1483            object_id: table_id.as_raw_id(),
1484            changed_props: changed_props.into_iter().collect(),
1485            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1486            connector_conn_ref,
1487            object_type: AlterConnectorPropsObject::IcebergTable as i32,
1488            extra_options: Some(ExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds {
1489                sink_id,
1490                source_id,
1491            })),
1492        };
1493        let _resp = self.inner.alter_connector_props(req).await?;
1494        Ok(())
1495    }
1496
1497    pub async fn alter_source_connector_props(
1498        &self,
1499        source_id: SourceId,
1500        changed_props: BTreeMap<String, String>,
1501        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1502        connector_conn_ref: Option<ConnectionId>,
1503    ) -> Result<()> {
1504        let req = AlterConnectorPropsRequest {
1505            object_id: source_id.as_raw_id(),
1506            changed_props: changed_props.into_iter().collect(),
1507            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1508            connector_conn_ref,
1509            object_type: AlterConnectorPropsObject::Source as i32,
1510            extra_options: None,
1511        };
1512        let _resp = self.inner.alter_connector_props(req).await?;
1513        Ok(())
1514    }
1515
1516    pub async fn alter_connection_connector_props(
1517        &self,
1518        connection_id: u32,
1519        changed_props: BTreeMap<String, String>,
1520        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1521    ) -> Result<()> {
1522        let req = AlterConnectorPropsRequest {
1523            object_id: connection_id,
1524            changed_props: changed_props.into_iter().collect(),
1525            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1526            connector_conn_ref: None, // Connections don't reference other connections
1527            object_type: AlterConnectorPropsObject::Connection as i32,
1528            extra_options: None,
1529        };
1530        let _resp = self.inner.alter_connector_props(req).await?;
1531        Ok(())
1532    }
1533
1534    pub async fn set_system_param(
1535        &self,
1536        param: String,
1537        value: Option<String>,
1538    ) -> Result<Option<SystemParamsReader>> {
1539        let req = SetSystemParamRequest { param, value };
1540        let resp = self.inner.set_system_param(req).await?;
1541        Ok(resp.params.map(SystemParamsReader::from))
1542    }
1543
1544    pub async fn get_session_params(&self) -> Result<String> {
1545        let req = GetSessionParamsRequest {};
1546        let resp = self.inner.get_session_params(req).await?;
1547        Ok(resp.params)
1548    }
1549
1550    pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1551        let req = SetSessionParamRequest { param, value };
1552        let resp = self.inner.set_session_param(req).await?;
1553        Ok(resp.param)
1554    }
1555
1556    pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1557        let req = GetDdlProgressRequest {};
1558        let resp = self.inner.get_ddl_progress(req).await?;
1559        Ok(resp.ddl_progress)
1560    }
1561
1562    pub async fn split_compaction_group(
1563        &self,
1564        group_id: CompactionGroupId,
1565        table_ids_to_new_group: &[TableId],
1566        partition_vnode_count: u32,
1567    ) -> Result<CompactionGroupId> {
1568        let req = SplitCompactionGroupRequest {
1569            group_id,
1570            table_ids: table_ids_to_new_group.to_vec(),
1571            partition_vnode_count,
1572        };
1573        let resp = self.inner.split_compaction_group(req).await?;
1574        Ok(resp.new_group_id)
1575    }
1576
1577    pub async fn get_tables(
1578        &self,
1579        table_ids: Vec<TableId>,
1580        include_dropped_tables: bool,
1581    ) -> Result<HashMap<TableId, Table>> {
1582        let req = GetTablesRequest {
1583            table_ids,
1584            include_dropped_tables,
1585        };
1586        let resp = self.inner.get_tables(req).await?;
1587        Ok(resp.tables)
1588    }
1589
1590    pub async fn list_serving_vnode_mappings(
1591        &self,
1592    ) -> Result<HashMap<FragmentId, (u32, WorkerSlotMapping)>> {
1593        let req = GetServingVnodeMappingsRequest {};
1594        let resp = self.inner.get_serving_vnode_mappings(req).await?;
1595        let mappings = resp
1596            .worker_slot_mappings
1597            .into_iter()
1598            .map(|p| {
1599                (
1600                    p.fragment_id,
1601                    (
1602                        resp.fragment_to_table
1603                            .get(&p.fragment_id)
1604                            .cloned()
1605                            .unwrap_or(0),
1606                        WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1607                    ),
1608                )
1609            })
1610            .collect();
1611        Ok(mappings)
1612    }
1613
1614    pub async fn risectl_list_compaction_status(
1615        &self,
1616    ) -> Result<(
1617        Vec<CompactStatus>,
1618        Vec<CompactTaskAssignment>,
1619        Vec<CompactTaskProgress>,
1620    )> {
1621        let req = RiseCtlListCompactionStatusRequest {};
1622        let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1623        Ok((
1624            resp.compaction_statuses,
1625            resp.task_assignment,
1626            resp.task_progress,
1627        ))
1628    }
1629
1630    pub async fn get_compaction_score(
1631        &self,
1632        compaction_group_id: CompactionGroupId,
1633    ) -> Result<Vec<PickerInfo>> {
1634        let req = GetCompactionScoreRequest {
1635            compaction_group_id,
1636        };
1637        let resp = self.inner.get_compaction_score(req).await?;
1638        Ok(resp.scores)
1639    }
1640
1641    pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1642        let req = RiseCtlRebuildTableStatsRequest {};
1643        let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1644        Ok(())
1645    }
1646
1647    pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1648        let req = ListBranchedObjectRequest {};
1649        let resp = self.inner.list_branched_object(req).await?;
1650        Ok(resp.branched_objects)
1651    }
1652
1653    pub async fn list_active_write_limit(&self) -> Result<HashMap<u64, WriteLimit>> {
1654        let req = ListActiveWriteLimitRequest {};
1655        let resp = self.inner.list_active_write_limit(req).await?;
1656        Ok(resp.write_limits)
1657    }
1658
1659    pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1660        let req = ListHummockMetaConfigRequest {};
1661        let resp = self.inner.list_hummock_meta_config(req).await?;
1662        Ok(resp.configs)
1663    }
1664
1665    pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1666        let _resp = self
1667            .inner
1668            .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1669            .await?;
1670
1671        Ok(())
1672    }
1673
1674    pub async fn rw_cloud_validate_source(
1675        &self,
1676        source_type: SourceType,
1677        source_config: HashMap<String, String>,
1678    ) -> Result<RwCloudValidateSourceResponse> {
1679        let req = RwCloudValidateSourceRequest {
1680            source_type: source_type.into(),
1681            source_config,
1682        };
1683        let resp = self.inner.rw_cloud_validate_source(req).await?;
1684        Ok(resp)
1685    }
1686
1687    pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1688        self.inner.core.read().await.sink_coordinate_client.clone()
1689    }
1690
1691    pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1692        let req = ListCompactTaskAssignmentRequest {};
1693        let resp = self.inner.list_compact_task_assignment(req).await?;
1694        Ok(resp.task_assignment)
1695    }
1696
1697    pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1698        let req = ListEventLogRequest::default();
1699        let resp = self.inner.list_event_log(req).await?;
1700        Ok(resp.event_logs)
1701    }
1702
1703    pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1704        let req = ListCompactTaskProgressRequest {};
1705        let resp = self.inner.list_compact_task_progress(req).await?;
1706        Ok(resp.task_progress)
1707    }
1708
1709    #[cfg(madsim)]
1710    pub fn try_add_panic_event_blocking(
1711        &self,
1712        panic_info: impl Display,
1713        timeout_millis: Option<u64>,
1714    ) {
1715    }
1716
1717    /// If `timeout_millis` is None, default is used.
1718    #[cfg(not(madsim))]
1719    pub fn try_add_panic_event_blocking(
1720        &self,
1721        panic_info: impl Display,
1722        timeout_millis: Option<u64>,
1723    ) {
1724        let event = event_log::EventWorkerNodePanic {
1725            worker_id: self.worker_id,
1726            worker_type: self.worker_type.into(),
1727            host_addr: Some(self.host_addr.to_protobuf()),
1728            panic_info: format!("{panic_info}"),
1729        };
1730        let grpc_meta_client = self.inner.clone();
1731        let _ = thread::spawn(move || {
1732            let rt = tokio::runtime::Builder::new_current_thread()
1733                .enable_all()
1734                .build()
1735                .unwrap();
1736            let req = AddEventLogRequest {
1737                event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1738            };
1739            rt.block_on(async {
1740                let _ = tokio::time::timeout(
1741                    Duration::from_millis(timeout_millis.unwrap_or(1000)),
1742                    grpc_meta_client.add_event_log(req),
1743                )
1744                .await;
1745            });
1746        })
1747        .join();
1748    }
1749
1750    pub async fn add_sink_fail_evet(
1751        &self,
1752        sink_id: SinkId,
1753        sink_name: String,
1754        connector: String,
1755        error: String,
1756    ) -> Result<()> {
1757        let event = event_log::EventSinkFail {
1758            sink_id,
1759            sink_name,
1760            connector,
1761            error,
1762        };
1763        let req = AddEventLogRequest {
1764            event: Some(add_event_log_request::Event::SinkFail(event)),
1765        };
1766        self.inner.add_event_log(req).await?;
1767        Ok(())
1768    }
1769
1770    pub async fn add_cdc_auto_schema_change_fail_event(
1771        &self,
1772        source_id: SourceId,
1773        table_name: String,
1774        cdc_table_id: String,
1775        upstream_ddl: String,
1776        fail_info: String,
1777    ) -> Result<()> {
1778        let event = event_log::EventAutoSchemaChangeFail {
1779            table_id: source_id.as_cdc_table_id(),
1780            table_name,
1781            cdc_table_id,
1782            upstream_ddl,
1783            fail_info,
1784        };
1785        let req = AddEventLogRequest {
1786            event: Some(add_event_log_request::Event::AutoSchemaChangeFail(event)),
1787        };
1788        self.inner.add_event_log(req).await?;
1789        Ok(())
1790    }
1791
1792    pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1793        let req = CancelCompactTaskRequest {
1794            task_id,
1795            task_status: task_status as _,
1796        };
1797        let resp = self.inner.cancel_compact_task(req).await?;
1798        Ok(resp.ret)
1799    }
1800
1801    pub async fn get_version_by_epoch(
1802        &self,
1803        epoch: HummockEpoch,
1804        table_id: TableId,
1805    ) -> Result<PbHummockVersion> {
1806        let req = GetVersionByEpochRequest { epoch, table_id };
1807        let resp = self.inner.get_version_by_epoch(req).await?;
1808        Ok(resp.version.unwrap())
1809    }
1810
1811    pub async fn get_cluster_limits(
1812        &self,
1813    ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1814        let req = GetClusterLimitsRequest {};
1815        let resp = self.inner.get_cluster_limits(req).await?;
1816        Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1817    }
1818
1819    pub async fn merge_compaction_group(
1820        &self,
1821        left_group_id: CompactionGroupId,
1822        right_group_id: CompactionGroupId,
1823    ) -> Result<()> {
1824        let req = MergeCompactionGroupRequest {
1825            left_group_id,
1826            right_group_id,
1827        };
1828        self.inner.merge_compaction_group(req).await?;
1829        Ok(())
1830    }
1831
1832    /// List all rate limits for sources and backfills
1833    pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1834        let request = ListRateLimitsRequest {};
1835        let resp = self.inner.list_rate_limits(request).await?;
1836        Ok(resp.rate_limits)
1837    }
1838
1839    pub async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
1840        let request = ListCdcProgressRequest {};
1841        let resp = self.inner.list_cdc_progress(request).await?;
1842        Ok(resp.cdc_progress)
1843    }
1844
1845    pub async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>> {
1846        let request = ListRefreshTableStatesRequest {};
1847        let resp = self.inner.list_refresh_table_states(request).await?;
1848        Ok(resp.states)
1849    }
1850
1851    pub async fn create_iceberg_table(
1852        &self,
1853        table_job_info: PbTableJobInfo,
1854        sink_job_info: PbSinkJobInfo,
1855        iceberg_source: PbSource,
1856        if_not_exists: bool,
1857    ) -> Result<WaitVersion> {
1858        let request = CreateIcebergTableRequest {
1859            table_info: Some(table_job_info),
1860            sink_info: Some(sink_job_info),
1861            iceberg_source: Some(iceberg_source),
1862            if_not_exists,
1863        };
1864
1865        let resp = self.inner.create_iceberg_table(request).await?;
1866        Ok(resp
1867            .version
1868            .ok_or_else(|| anyhow!("wait version not set"))?)
1869    }
1870
1871    pub async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
1872        let request = ListIcebergTablesRequest {};
1873        let resp = self.inner.list_iceberg_tables(request).await?;
1874        Ok(resp.iceberg_tables)
1875    }
1876
1877    /// Get the await tree of all nodes in the cluster.
1878    pub async fn get_cluster_stack_trace(
1879        &self,
1880        actor_traces_format: ActorTracesFormat,
1881    ) -> Result<StackTraceResponse> {
1882        let request = StackTraceRequest {
1883            actor_traces_format: actor_traces_format as i32,
1884        };
1885        let resp = self.inner.stack_trace(request).await?;
1886        Ok(resp)
1887    }
1888
1889    pub async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
1890        let request = SetSyncLogStoreAlignedRequest { job_id, aligned };
1891        self.inner.set_sync_log_store_aligned(request).await?;
1892        Ok(())
1893    }
1894
1895    pub async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
1896        self.inner.refresh(request).await
1897    }
1898
1899    pub async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
1900        let request = ListUnmigratedTablesRequest {};
1901        let resp = self.inner.list_unmigrated_tables(request).await?;
1902        Ok(resp
1903            .tables
1904            .into_iter()
1905            .map(|table| (table.table_id, table.table_name))
1906            .collect())
1907    }
1908}
1909
1910#[async_trait]
1911impl HummockMetaClient for MetaClient {
1912    async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
1913        let req = UnpinVersionBeforeRequest {
1914            context_id: self.worker_id(),
1915            unpin_version_before: unpin_version_before.to_u64(),
1916        };
1917        self.inner.unpin_version_before(req).await?;
1918        Ok(())
1919    }
1920
1921    async fn get_current_version(&self) -> Result<HummockVersion> {
1922        let req = GetCurrentVersionRequest::default();
1923        Ok(HummockVersion::from_rpc_protobuf(
1924            &self
1925                .inner
1926                .get_current_version(req)
1927                .await?
1928                .current_version
1929                .unwrap(),
1930        ))
1931    }
1932
1933    async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
1934        let resp = self
1935            .inner
1936            .get_new_object_ids(GetNewObjectIdsRequest { number })
1937            .await?;
1938        Ok(ObjectIdRange::new(resp.start_id, resp.end_id))
1939    }
1940
1941    async fn commit_epoch_with_change_log(
1942        &self,
1943        _epoch: HummockEpoch,
1944        _sync_result: SyncResult,
1945        _change_log_info: Option<HummockMetaClientChangeLogInfo>,
1946    ) -> Result<()> {
1947        panic!("Only meta service can commit_epoch in production.")
1948    }
1949
1950    async fn trigger_manual_compaction(
1951        &self,
1952        compaction_group_id: u64,
1953        table_id: JobId,
1954        level: u32,
1955        sst_ids: Vec<u64>,
1956    ) -> Result<()> {
1957        // TODO: support key_range parameter
1958        let req = TriggerManualCompactionRequest {
1959            compaction_group_id,
1960            table_id,
1961            // if table_id not exist, manual_compaction will include all the sst
1962            // without check internal_table_id
1963            level,
1964            sst_ids,
1965            ..Default::default()
1966        };
1967
1968        self.inner.trigger_manual_compaction(req).await?;
1969        Ok(())
1970    }
1971
1972    async fn trigger_full_gc(
1973        &self,
1974        sst_retention_time_sec: u64,
1975        prefix: Option<String>,
1976    ) -> Result<()> {
1977        self.inner
1978            .trigger_full_gc(TriggerFullGcRequest {
1979                sst_retention_time_sec,
1980                prefix,
1981            })
1982            .await?;
1983        Ok(())
1984    }
1985
1986    async fn subscribe_compaction_event(
1987        &self,
1988    ) -> Result<(
1989        UnboundedSender<SubscribeCompactionEventRequest>,
1990        BoxStream<'static, CompactionEventItem>,
1991    )> {
1992        let (request_sender, request_receiver) =
1993            unbounded_channel::<SubscribeCompactionEventRequest>();
1994        request_sender
1995            .send(SubscribeCompactionEventRequest {
1996                event: Some(subscribe_compaction_event_request::Event::Register(
1997                    Register {
1998                        context_id: self.worker_id(),
1999                    },
2000                )),
2001                create_at: SystemTime::now()
2002                    .duration_since(SystemTime::UNIX_EPOCH)
2003                    .expect("Clock may have gone backwards")
2004                    .as_millis() as u64,
2005            })
2006            .context("Failed to subscribe compaction event")?;
2007
2008        let stream = self
2009            .inner
2010            .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
2011                request_receiver,
2012            )))
2013            .await?;
2014
2015        Ok((request_sender, Box::pin(stream)))
2016    }
2017
2018    async fn get_version_by_epoch(
2019        &self,
2020        epoch: HummockEpoch,
2021        table_id: TableId,
2022    ) -> Result<PbHummockVersion> {
2023        self.get_version_by_epoch(epoch, table_id).await
2024    }
2025
2026    async fn subscribe_iceberg_compaction_event(
2027        &self,
2028    ) -> Result<(
2029        UnboundedSender<SubscribeIcebergCompactionEventRequest>,
2030        BoxStream<'static, IcebergCompactionEventItem>,
2031    )> {
2032        let (request_sender, request_receiver) =
2033            unbounded_channel::<SubscribeIcebergCompactionEventRequest>();
2034        request_sender
2035            .send(SubscribeIcebergCompactionEventRequest {
2036                event: Some(subscribe_iceberg_compaction_event_request::Event::Register(
2037                    IcebergRegister {
2038                        context_id: self.worker_id(),
2039                    },
2040                )),
2041                create_at: SystemTime::now()
2042                    .duration_since(SystemTime::UNIX_EPOCH)
2043                    .expect("Clock may have gone backwards")
2044                    .as_millis() as u64,
2045            })
2046            .context("Failed to subscribe compaction event")?;
2047
2048        let stream = self
2049            .inner
2050            .subscribe_iceberg_compaction_event(Request::new(UnboundedReceiverStream::new(
2051                request_receiver,
2052            )))
2053            .await?;
2054
2055        Ok((request_sender, Box::pin(stream)))
2056    }
2057}
2058
2059#[async_trait]
2060impl TelemetryInfoFetcher for MetaClient {
2061    async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
2062        let resp = self
2063            .get_telemetry_info()
2064            .await
2065            .map_err(|e| e.to_report_string())?;
2066        let tracking_id = resp.get_tracking_id().ok();
2067        Ok(tracking_id.map(|id| id.to_owned()))
2068    }
2069}
2070
2071pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
2072
2073#[derive(Debug, Clone)]
2074struct GrpcMetaClientCore {
2075    cluster_client: ClusterServiceClient<Channel>,
2076    meta_member_client: MetaMemberServiceClient<Channel>,
2077    heartbeat_client: HeartbeatServiceClient<Channel>,
2078    ddl_client: DdlServiceClient<Channel>,
2079    hummock_client: HummockManagerServiceClient<Channel>,
2080    notification_client: NotificationServiceClient<Channel>,
2081    stream_client: StreamManagerServiceClient<Channel>,
2082    user_client: UserServiceClient<Channel>,
2083    scale_client: ScaleServiceClient<Channel>,
2084    backup_client: BackupServiceClient<Channel>,
2085    telemetry_client: TelemetryInfoServiceClient<Channel>,
2086    system_params_client: SystemParamsServiceClient<Channel>,
2087    session_params_client: SessionParamServiceClient<Channel>,
2088    serving_client: ServingServiceClient<Channel>,
2089    cloud_client: CloudServiceClient<Channel>,
2090    sink_coordinate_client: SinkCoordinationRpcClient,
2091    event_log_client: EventLogServiceClient<Channel>,
2092    cluster_limit_client: ClusterLimitServiceClient<Channel>,
2093    hosted_iceberg_catalog_service_client: HostedIcebergCatalogServiceClient<Channel>,
2094    monitor_client: MonitorServiceClient<Channel>,
2095}
2096
2097impl GrpcMetaClientCore {
2098    pub(crate) fn new(channel: Channel) -> Self {
2099        let cluster_client = ClusterServiceClient::new(channel.clone());
2100        let meta_member_client = MetaMemberClient::new(channel.clone());
2101        let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
2102        let ddl_client =
2103            DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2104        let hummock_client =
2105            HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2106        let notification_client =
2107            NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2108        let stream_client =
2109            StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2110        let user_client = UserServiceClient::new(channel.clone());
2111        let scale_client =
2112            ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2113        let backup_client = BackupServiceClient::new(channel.clone());
2114        let telemetry_client =
2115            TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2116        let system_params_client = SystemParamsServiceClient::new(channel.clone());
2117        let session_params_client = SessionParamServiceClient::new(channel.clone());
2118        let serving_client = ServingServiceClient::new(channel.clone());
2119        let cloud_client = CloudServiceClient::new(channel.clone());
2120        let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone())
2121            .max_decoding_message_size(usize::MAX);
2122        let event_log_client = EventLogServiceClient::new(channel.clone());
2123        let cluster_limit_client = ClusterLimitServiceClient::new(channel.clone());
2124        let hosted_iceberg_catalog_service_client =
2125            HostedIcebergCatalogServiceClient::new(channel.clone());
2126        let monitor_client = MonitorServiceClient::new(channel);
2127
2128        GrpcMetaClientCore {
2129            cluster_client,
2130            meta_member_client,
2131            heartbeat_client,
2132            ddl_client,
2133            hummock_client,
2134            notification_client,
2135            stream_client,
2136            user_client,
2137            scale_client,
2138            backup_client,
2139            telemetry_client,
2140            system_params_client,
2141            session_params_client,
2142            serving_client,
2143            cloud_client,
2144            sink_coordinate_client,
2145            event_log_client,
2146            cluster_limit_client,
2147            hosted_iceberg_catalog_service_client,
2148            monitor_client,
2149        }
2150    }
2151}
2152
2153/// Client to meta server. Cloning the instance is lightweight.
2154///
2155/// It is a wrapper of tonic client. See [`crate::meta_rpc_client_method_impl`].
2156#[derive(Debug, Clone)]
2157struct GrpcMetaClient {
2158    member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
2159    core: Arc<RwLock<GrpcMetaClientCore>>,
2160}
2161
2162type MetaMemberClient = MetaMemberServiceClient<Channel>;
2163
2164struct MetaMemberGroup {
2165    members: LruCache<http::Uri, Option<MetaMemberClient>>,
2166}
2167
2168struct MetaMemberManagement {
2169    core_ref: Arc<RwLock<GrpcMetaClientCore>>,
2170    members: Either<MetaMemberClient, MetaMemberGroup>,
2171    current_leader: http::Uri,
2172    meta_config: Arc<MetaConfig>,
2173}
2174
2175impl MetaMemberManagement {
2176    const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
2177
2178    fn host_address_to_uri(addr: HostAddress) -> http::Uri {
2179        format!("http://{}:{}", addr.host, addr.port)
2180            .parse()
2181            .unwrap()
2182    }
2183
2184    async fn recreate_core(&self, channel: Channel) {
2185        let mut core = self.core_ref.write().await;
2186        *core = GrpcMetaClientCore::new(channel);
2187    }
2188
2189    async fn refresh_members(&mut self) -> Result<()> {
2190        let leader_addr = match self.members.as_mut() {
2191            Either::Left(client) => {
2192                let resp = client
2193                    .to_owned()
2194                    .members(MembersRequest {})
2195                    .await
2196                    .map_err(RpcError::from_meta_status)?;
2197                let resp = resp.into_inner();
2198                resp.members.into_iter().find(|member| member.is_leader)
2199            }
2200            Either::Right(member_group) => {
2201                let mut fetched_members = None;
2202
2203                for (addr, client) in &mut member_group.members {
2204                    let members: Result<_> = try {
2205                        let mut client = match client {
2206                            Some(cached_client) => cached_client.to_owned(),
2207                            None => {
2208                                let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
2209                                let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
2210                                    .await
2211                                    .context("failed to create client")?;
2212                                let new_client: MetaMemberClient =
2213                                    MetaMemberServiceClient::new(channel);
2214                                *client = Some(new_client.clone());
2215
2216                                new_client
2217                            }
2218                        };
2219
2220                        let resp = client
2221                            .members(MembersRequest {})
2222                            .await
2223                            .context("failed to fetch members")?;
2224
2225                        resp.into_inner().members
2226                    };
2227
2228                    let fetched = members.is_ok();
2229                    fetched_members = Some(members);
2230                    if fetched {
2231                        break;
2232                    }
2233                }
2234
2235                let members = fetched_members
2236                    .context("no member available in the list")?
2237                    .context("could not refresh members")?;
2238
2239                // find new leader
2240                let mut leader = None;
2241                for member in members {
2242                    if member.is_leader {
2243                        leader = Some(member.clone());
2244                    }
2245
2246                    let addr = Self::host_address_to_uri(member.address.unwrap());
2247                    // We don't clean any expired addrs here to deal with some extreme situations.
2248                    if !member_group.members.contains(&addr) {
2249                        tracing::info!("new meta member joined: {}", addr);
2250                        member_group.members.put(addr, None);
2251                    }
2252                }
2253
2254                leader
2255            }
2256        };
2257
2258        if let Some(leader) = leader_addr {
2259            let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
2260
2261            if discovered_leader != self.current_leader {
2262                tracing::info!("new meta leader {} discovered", discovered_leader);
2263
2264                let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
2265                    Duration::from_secs(self.meta_config.meta_leader_lease_secs),
2266                    false,
2267                );
2268
2269                let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
2270                    let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
2271                    GrpcMetaClient::connect_to_endpoint(endpoint).await
2272                })
2273                .await?;
2274
2275                self.recreate_core(channel).await;
2276                self.current_leader = discovered_leader;
2277            }
2278        }
2279
2280        Ok(())
2281    }
2282}
2283
2284impl GrpcMetaClient {
2285    // See `Endpoint::http2_keep_alive_interval`
2286    const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
2287    // See `Endpoint::keep_alive_timeout`
2288    const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
2289    // Retry base interval in ms for connecting to meta server.
2290    const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
2291    // Max retry times for connecting to meta server.
2292    const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
2293
2294    fn start_meta_member_monitor(
2295        &self,
2296        init_leader_addr: http::Uri,
2297        members: Either<MetaMemberClient, MetaMemberGroup>,
2298        force_refresh_receiver: Receiver<Sender<Result<()>>>,
2299        meta_config: Arc<MetaConfig>,
2300    ) -> Result<()> {
2301        let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
2302        let current_leader = init_leader_addr;
2303
2304        let enable_period_tick = matches!(members, Either::Right(_));
2305
2306        let member_management = MetaMemberManagement {
2307            core_ref,
2308            members,
2309            current_leader,
2310            meta_config,
2311        };
2312
2313        let mut force_refresh_receiver = force_refresh_receiver;
2314
2315        tokio::spawn(async move {
2316            let mut member_management = member_management;
2317            let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
2318
2319            loop {
2320                let event: Option<Sender<Result<()>>> = if enable_period_tick {
2321                    tokio::select! {
2322                        _ = ticker.tick() => None,
2323                        result_sender = force_refresh_receiver.recv() => {
2324                            if result_sender.is_none() {
2325                                break;
2326                            }
2327
2328                            result_sender
2329                        },
2330                    }
2331                } else {
2332                    let result_sender = force_refresh_receiver.recv().await;
2333
2334                    if result_sender.is_none() {
2335                        break;
2336                    }
2337
2338                    result_sender
2339                };
2340
2341                let tick_result = member_management.refresh_members().await;
2342                if let Err(e) = tick_result.as_ref() {
2343                    tracing::warn!(error = %e.as_report(),  "refresh meta member client failed");
2344                }
2345
2346                if let Some(sender) = event {
2347                    // ignore resp
2348                    let _resp = sender.send(tick_result);
2349                }
2350            }
2351        });
2352
2353        Ok(())
2354    }
2355
2356    async fn force_refresh_leader(&self) -> Result<()> {
2357        let (sender, receiver) = oneshot::channel();
2358
2359        self.member_monitor_event_sender
2360            .send(sender)
2361            .await
2362            .map_err(|e| anyhow!(e))?;
2363
2364        receiver.await.map_err(|e| anyhow!(e))?
2365    }
2366
2367    /// Connect to the meta server from `addrs`.
2368    pub async fn new(strategy: &MetaAddressStrategy, config: Arc<MetaConfig>) -> Result<Self> {
2369        let (channel, addr) = match strategy {
2370            MetaAddressStrategy::LoadBalance(addr) => {
2371                Self::try_build_rpc_channel(vec![addr.clone()]).await
2372            }
2373            MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
2374        }?;
2375        let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
2376        let client = GrpcMetaClient {
2377            member_monitor_event_sender: force_refresh_sender,
2378            core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
2379        };
2380
2381        let meta_member_client = client.core.read().await.meta_member_client.clone();
2382        let members = match strategy {
2383            MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
2384            MetaAddressStrategy::List(addrs) => {
2385                let mut members = LruCache::new(20.try_into().unwrap());
2386                for addr in addrs {
2387                    members.put(addr.clone(), None);
2388                }
2389                members.put(addr.clone(), Some(meta_member_client));
2390
2391                Either::Right(MetaMemberGroup { members })
2392            }
2393        };
2394
2395        client.start_meta_member_monitor(addr, members, force_refresh_receiver, config.clone())?;
2396
2397        client.force_refresh_leader().await?;
2398
2399        Ok(client)
2400    }
2401
2402    fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2403        Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2404    }
2405
2406    pub(crate) async fn try_build_rpc_channel(
2407        addrs: impl IntoIterator<Item = http::Uri>,
2408    ) -> Result<(Channel, http::Uri)> {
2409        let endpoints: Vec<_> = addrs
2410            .into_iter()
2411            .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2412            .collect();
2413
2414        let mut last_error = None;
2415
2416        for (endpoint, addr) in endpoints {
2417            match Self::connect_to_endpoint(endpoint).await {
2418                Ok(channel) => {
2419                    tracing::info!("Connect to meta server {} successfully", addr);
2420                    return Ok((channel, addr));
2421                }
2422                Err(e) => {
2423                    tracing::warn!(
2424                        error = %e.as_report(),
2425                        "Failed to connect to meta server {}, trying again",
2426                        addr,
2427                    );
2428                    last_error = Some(e);
2429                }
2430            }
2431        }
2432
2433        if let Some(last_error) = last_error {
2434            Err(anyhow::anyhow!(last_error)
2435                .context("failed to connect to all meta servers")
2436                .into())
2437        } else {
2438            bail!("no meta server address provided")
2439        }
2440    }
2441
2442    async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2443        let channel = endpoint
2444            .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2445            .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2446            .connect_timeout(Duration::from_secs(5))
2447            .monitored_connect("grpc-meta-client", Default::default())
2448            .await?
2449            .wrapped();
2450
2451        Ok(channel)
2452    }
2453
2454    pub(crate) fn retry_strategy_to_bound(
2455        high_bound: Duration,
2456        exceed: bool,
2457    ) -> impl Iterator<Item = Duration> {
2458        let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2459            .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2460            .map(jitter);
2461
2462        let mut sum = Duration::default();
2463
2464        iter.take_while(move |duration| {
2465            sum += *duration;
2466
2467            if exceed {
2468                sum < high_bound + *duration
2469            } else {
2470                sum < high_bound
2471            }
2472        })
2473    }
2474}
2475
2476macro_rules! for_all_meta_rpc {
2477    ($macro:ident) => {
2478        $macro! {
2479             { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2480            ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2481            ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2482            ,{ cluster_client, update_worker_node_schedulability, UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse }
2483            ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2484            ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2485            ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2486            ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2487            ,{ stream_client, flush, FlushRequest, FlushResponse }
2488            ,{ stream_client, pause, PauseRequest, PauseResponse }
2489            ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2490            ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2491            ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2492            ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2493            ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2494            ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2495            ,{ stream_client, list_creating_fragment_distribution, ListCreatingFragmentDistributionRequest, ListCreatingFragmentDistributionResponse }
2496            ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2497            ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2498            ,{ stream_client, list_object_dependencies, ListObjectDependenciesRequest, ListObjectDependenciesResponse }
2499            ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2500            ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2501            ,{ stream_client, list_cdc_progress, ListCdcProgressRequest, ListCdcProgressResponse }
2502            ,{ stream_client, list_refresh_table_states, ListRefreshTableStatesRequest, ListRefreshTableStatesResponse }
2503            ,{ stream_client, alter_connector_props, AlterConnectorPropsRequest, AlterConnectorPropsResponse }
2504            ,{ stream_client, get_fragment_by_id, GetFragmentByIdRequest, GetFragmentByIdResponse }
2505            ,{ stream_client, get_fragment_vnodes, GetFragmentVnodesRequest, GetFragmentVnodesResponse }
2506            ,{ stream_client, get_actor_vnodes, GetActorVnodesRequest, GetActorVnodesResponse }
2507            ,{ stream_client, set_sync_log_store_aligned, SetSyncLogStoreAlignedRequest, SetSyncLogStoreAlignedResponse }
2508            ,{ stream_client, refresh, RefreshRequest, RefreshResponse }
2509            ,{ stream_client, list_unmigrated_tables, ListUnmigratedTablesRequest, ListUnmigratedTablesResponse }
2510            ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2511            ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2512            ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2513            ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2514            ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2515            ,{ ddl_client, alter_streaming_job_config, AlterStreamingJobConfigRequest, AlterStreamingJobConfigResponse }
2516            ,{ ddl_client, alter_fragment_parallelism, AlterFragmentParallelismRequest, AlterFragmentParallelismResponse }
2517            ,{ ddl_client, alter_cdc_table_backfill_parallelism, AlterCdcTableBackfillParallelismRequest, AlterCdcTableBackfillParallelismResponse }
2518            ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2519            ,{ ddl_client, alter_database_param, AlterDatabaseParamRequest, AlterDatabaseParamResponse }
2520            ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2521            ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2522            ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2523            ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2524            ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2525            ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2526            ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2527            ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2528            ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2529            ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2530            ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2531            ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2532            ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2533            ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2534            ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2535            ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2536            ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2537            ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2538            ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2539            ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2540            ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2541            ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2542            ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2543            ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2544            ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2545            ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2546            ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2547            ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2548            ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2549            ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2550            ,{ ddl_client, wait, WaitRequest, WaitResponse }
2551            ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2552            ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2553            ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2554            ,{ ddl_client, compact_iceberg_table, CompactIcebergTableRequest, CompactIcebergTableResponse }
2555            ,{ ddl_client, expire_iceberg_table_snapshots, ExpireIcebergTableSnapshotsRequest, ExpireIcebergTableSnapshotsResponse }
2556            ,{ ddl_client, create_iceberg_table, CreateIcebergTableRequest, CreateIcebergTableResponse }
2557            ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2558            ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2559            ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2560            ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2561            ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2562            ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2563            ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2564            ,{ hummock_client, get_new_object_ids, GetNewObjectIdsRequest, GetNewObjectIdsResponse }
2565            ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2566            ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2567            ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2568            ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2569            ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2570            ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2571            ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2572            ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2573            ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2574            ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2575            ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2576            ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2577            ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2578            ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2579            ,{ hummock_client, subscribe_iceberg_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeIcebergCompactionEventRequest>, Streaming<SubscribeIcebergCompactionEventResponse> }
2580            ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2581            ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2582            ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2583            ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2584            ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2585            ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2586            ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2587            ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2588            ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2589            ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2590            ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2591            ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2592            ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2593            ,{ user_client, alter_default_privilege, AlterDefaultPrivilegeRequest, AlterDefaultPrivilegeResponse }
2594            ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2595            ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2596            ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2597            ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2598            ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2599            ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2600            ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2601            ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2602            ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2603            ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2604            ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2605            ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2606            ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2607            ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2608            ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2609            ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2610            ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2611            ,{ hosted_iceberg_catalog_service_client, list_iceberg_tables, ListIcebergTablesRequest, ListIcebergTablesResponse }
2612            ,{ monitor_client, stack_trace, StackTraceRequest, StackTraceResponse }
2613        }
2614    };
2615}
2616
2617impl GrpcMetaClient {
2618    async fn refresh_client_if_needed(&self, code: Code) {
2619        if matches!(
2620            code,
2621            Code::Unknown | Code::Unimplemented | Code::Unavailable
2622        ) {
2623            tracing::debug!("matching tonic code {}", code);
2624            let (result_sender, result_receiver) = oneshot::channel();
2625            if self
2626                .member_monitor_event_sender
2627                .try_send(result_sender)
2628                .is_ok()
2629            {
2630                if let Ok(Err(e)) = result_receiver.await {
2631                    tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2632                }
2633            } else {
2634                tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2635            }
2636        }
2637    }
2638}
2639
2640impl GrpcMetaClient {
2641    for_all_meta_rpc! { meta_rpc_client_method_impl }
2642}