risingwave_rpc_client/
meta_client.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{
33    AlterDatabaseParam, FunctionId, IndexId, ObjectId, SecretId, TableId,
34};
35use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
36use risingwave_common::hash::WorkerSlotMapping;
37use risingwave_common::id::{
38    ConnectionId, DatabaseId, JobId, SchemaId, SinkId, SubscriptionId, UserId, ViewId, WorkerId,
39};
40use risingwave_common::monitor::EndpointExt;
41use risingwave_common::system_param::reader::SystemParamsReader;
42use risingwave_common::telemetry::report::TelemetryInfoFetcher;
43use risingwave_common::util::addr::HostAddr;
44use risingwave_common::util::meta_addr::MetaAddressStrategy;
45use risingwave_common::util::resource_util::cpu::total_cpu_available;
46use risingwave_common::util::resource_util::hostname;
47use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
48use risingwave_error::bail;
49use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
50use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
51use risingwave_hummock_sdk::{
52    CompactionGroupId, HummockEpoch, HummockVersionId, ObjectIdRange, SyncResult,
53};
54use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
55use risingwave_pb::backup_service::*;
56use risingwave_pb::catalog::{
57    Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
58    PbSubscription, PbTable, PbView, Table,
59};
60use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
61use risingwave_pb::cloud_service::*;
62use risingwave_pb::common::worker_node::Property;
63use risingwave_pb::common::{HostAddress, OptionalUint32, OptionalUint64, WorkerNode, WorkerType};
64use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
65use risingwave_pb::ddl_service::alter_owner_request::Object;
66use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
67use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
68use risingwave_pb::ddl_service::*;
69use risingwave_pb::hummock::compact_task::TaskStatus;
70use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
71use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
72use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
73use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
74use risingwave_pb::hummock::write_limits::WriteLimit;
75use risingwave_pb::hummock::*;
76use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Register as IcebergRegister;
77use risingwave_pb::iceberg_compaction::{
78    SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
79    subscribe_iceberg_compaction_event_request,
80};
81use risingwave_pb::id::{ActorId, FragmentId, HummockSstableId, SourceId};
82use risingwave_pb::meta::alter_connector_props_request::{
83    AlterConnectorPropsObject, AlterIcebergTableIds, ExtraOptions,
84};
85use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
86use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
87use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
88use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
89use risingwave_pb::meta::hosted_iceberg_catalog_service_client::HostedIcebergCatalogServiceClient;
90use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
91use risingwave_pb::meta::list_actor_states_response::ActorState;
92use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
93use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
94use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
95use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
96use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
97use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
98use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
99use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
100use risingwave_pb::meta::serving_service_client::ServingServiceClient;
101use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
102use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
103use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
104use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
105use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
106use risingwave_pb::meta::{FragmentDistribution, *};
107use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
108use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
109use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
110use risingwave_pb::secret::PbSecretRef;
111use risingwave_pb::stream_plan::StreamFragmentGraph;
112use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
113use risingwave_pb::user::update_user_request::UpdateField;
114use risingwave_pb::user::user_service_client::UserServiceClient;
115use risingwave_pb::user::*;
116use thiserror_ext::AsReport;
117use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
118use tokio::sync::oneshot::Sender;
119use tokio::sync::{RwLock, mpsc, oneshot};
120use tokio::task::JoinHandle;
121use tokio::time::{self};
122use tokio_retry::strategy::{ExponentialBackoff, jitter};
123use tokio_stream::wrappers::UnboundedReceiverStream;
124use tonic::transport::Endpoint;
125use tonic::{Code, Request, Streaming};
126
127use crate::channel::{Channel, WrappedChannelExt};
128use crate::error::{Result, RpcError};
129use crate::hummock_meta_client::{
130    CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
131    IcebergCompactionEventItem,
132};
133use crate::meta_rpc_client_method_impl;
134
135/// Client to meta server. Cloning the instance is lightweight.
136#[derive(Clone, Debug)]
137pub struct MetaClient {
138    worker_id: WorkerId,
139    worker_type: WorkerType,
140    host_addr: HostAddr,
141    inner: GrpcMetaClient,
142    meta_config: Arc<MetaConfig>,
143    cluster_id: String,
144    shutting_down: Arc<AtomicBool>,
145}
146
147impl MetaClient {
148    pub fn worker_id(&self) -> WorkerId {
149        self.worker_id
150    }
151
152    pub fn host_addr(&self) -> &HostAddr {
153        &self.host_addr
154    }
155
156    pub fn worker_type(&self) -> WorkerType {
157        self.worker_type
158    }
159
160    pub fn cluster_id(&self) -> &str {
161        &self.cluster_id
162    }
163
164    /// Subscribe to notification from meta.
165    pub async fn subscribe(
166        &self,
167        subscribe_type: SubscribeType,
168    ) -> Result<Streaming<SubscribeResponse>> {
169        let request = SubscribeRequest {
170            subscribe_type: subscribe_type as i32,
171            host: Some(self.host_addr.to_protobuf()),
172            worker_id: self.worker_id(),
173        };
174
175        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
176            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
177            true,
178        );
179
180        tokio_retry::Retry::spawn(retry_strategy, || async {
181            let request = request.clone();
182            self.inner.subscribe(request).await
183        })
184        .await
185    }
186
187    pub async fn create_connection(
188        &self,
189        connection_name: String,
190        database_id: DatabaseId,
191        schema_id: SchemaId,
192        owner_id: UserId,
193        req: create_connection_request::Payload,
194    ) -> Result<WaitVersion> {
195        let request = CreateConnectionRequest {
196            name: connection_name,
197            database_id,
198            schema_id,
199            owner_id,
200            payload: Some(req),
201        };
202        let resp = self.inner.create_connection(request).await?;
203        Ok(resp
204            .version
205            .ok_or_else(|| anyhow!("wait version not set"))?)
206    }
207
208    pub async fn create_secret(
209        &self,
210        secret_name: String,
211        database_id: DatabaseId,
212        schema_id: SchemaId,
213        owner_id: UserId,
214        value: Vec<u8>,
215    ) -> Result<WaitVersion> {
216        let request = CreateSecretRequest {
217            name: secret_name,
218            database_id,
219            schema_id,
220            owner_id,
221            value,
222        };
223        let resp = self.inner.create_secret(request).await?;
224        Ok(resp
225            .version
226            .ok_or_else(|| anyhow!("wait version not set"))?)
227    }
228
229    pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
230        let request = ListConnectionsRequest {};
231        let resp = self.inner.list_connections(request).await?;
232        Ok(resp.connections)
233    }
234
235    pub async fn drop_connection(
236        &self,
237        connection_id: ConnectionId,
238        cascade: bool,
239    ) -> Result<WaitVersion> {
240        let request = DropConnectionRequest {
241            connection_id,
242            cascade,
243        };
244        let resp = self.inner.drop_connection(request).await?;
245        Ok(resp
246            .version
247            .ok_or_else(|| anyhow!("wait version not set"))?)
248    }
249
250    pub async fn drop_secret(&self, secret_id: SecretId, cascade: bool) -> Result<WaitVersion> {
251        let request = DropSecretRequest { secret_id, cascade };
252        let resp = self.inner.drop_secret(request).await?;
253        Ok(resp
254            .version
255            .ok_or_else(|| anyhow!("wait version not set"))?)
256    }
257
258    /// Register the current node to the cluster and set the corresponding worker id.
259    ///
260    /// Retry if there's connection issue with the meta node. Exit the process if the registration fails.
261    pub async fn register_new(
262        addr_strategy: MetaAddressStrategy,
263        worker_type: WorkerType,
264        addr: &HostAddr,
265        property: Property,
266        meta_config: Arc<MetaConfig>,
267    ) -> (Self, SystemParamsReader) {
268        let ret =
269            Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
270
271        match ret {
272            Ok(ret) => ret,
273            Err(err) => {
274                tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
275                std::process::exit(1);
276            }
277        }
278    }
279
280    async fn register_new_inner(
281        addr_strategy: MetaAddressStrategy,
282        worker_type: WorkerType,
283        addr: &HostAddr,
284        property: Property,
285        meta_config: Arc<MetaConfig>,
286    ) -> Result<(Self, SystemParamsReader)> {
287        tracing::info!("register meta client using strategy: {}", addr_strategy);
288
289        // Retry until reaching `max_heartbeat_interval_secs`
290        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
291            Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
292            true,
293        );
294
295        if property.is_unschedulable {
296            tracing::warn!("worker {:?} registered as unschedulable", addr.clone());
297        }
298        let init_result: Result<_> = tokio_retry::RetryIf::spawn(
299            retry_strategy,
300            || async {
301                let grpc_meta_client =
302                    GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
303
304                let add_worker_resp = grpc_meta_client
305                    .add_worker_node(AddWorkerNodeRequest {
306                        worker_type: worker_type as i32,
307                        host: Some(addr.to_protobuf()),
308                        property: Some(property.clone()),
309                        resource: Some(risingwave_pb::common::worker_node::Resource {
310                            rw_version: RW_VERSION.to_owned(),
311                            total_memory_bytes: system_memory_available_bytes() as _,
312                            total_cpu_cores: total_cpu_available() as _,
313                            hostname: hostname(),
314                        }),
315                    })
316                    .await
317                    .context("failed to add worker node")?;
318
319                let system_params_resp = grpc_meta_client
320                    .get_system_params(GetSystemParamsRequest {})
321                    .await
322                    .context("failed to get initial system params")?;
323
324                Ok((add_worker_resp, system_params_resp, grpc_meta_client))
325            },
326            // Only retry if there's any transient connection issue.
327            // If the error is from our implementation or business, do not retry it.
328            |e: &RpcError| !e.is_from_tonic_server_impl(),
329        )
330        .await;
331
332        let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
333        let worker_id = add_worker_resp
334            .node_id
335            .expect("AddWorkerNodeResponse::node_id is empty");
336
337        let meta_client = Self {
338            worker_id,
339            worker_type,
340            host_addr: addr.clone(),
341            inner: grpc_meta_client,
342            meta_config: meta_config.clone(),
343            cluster_id: add_worker_resp.cluster_id,
344            shutting_down: Arc::new(false.into()),
345        };
346
347        static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
348        REPORT_PANIC.call_once(|| {
349            let meta_client_clone = meta_client.clone();
350            std::panic::update_hook(move |default_hook, info| {
351                // Try to report panic event to meta node.
352                meta_client_clone.try_add_panic_event_blocking(info, None);
353                default_hook(info);
354            });
355        });
356
357        Ok((meta_client, system_params_resp.params.unwrap().into()))
358    }
359
360    /// Activate the current node in cluster to confirm it's ready to serve.
361    pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
362        let request = ActivateWorkerNodeRequest {
363            host: Some(addr.to_protobuf()),
364            node_id: self.worker_id,
365        };
366        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
367            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
368            true,
369        );
370        tokio_retry::Retry::spawn(retry_strategy, || async {
371            let request = request.clone();
372            self.inner.activate_worker_node(request).await
373        })
374        .await?;
375
376        Ok(())
377    }
378
379    /// Send heartbeat signal to meta service.
380    pub async fn send_heartbeat(&self) -> Result<()> {
381        let request = HeartbeatRequest {
382            node_id: self.worker_id,
383            resource: Some(risingwave_pb::common::worker_node::Resource {
384                rw_version: RW_VERSION.to_owned(),
385                total_memory_bytes: system_memory_available_bytes() as _,
386                total_cpu_cores: total_cpu_available() as _,
387                hostname: hostname(),
388            }),
389        };
390        let resp = self.inner.heartbeat(request).await?;
391        if let Some(status) = resp.status
392            && status.code() == risingwave_pb::common::status::Code::UnknownWorker
393        {
394            // Ignore the error if we're already shutting down.
395            // Otherwise, exit the process.
396            if !self.shutting_down.load(Relaxed) {
397                tracing::error!(message = status.message, "worker expired");
398                std::process::exit(1);
399            }
400        }
401        Ok(())
402    }
403
404    pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
405        let request = CreateDatabaseRequest { db: Some(db) };
406        let resp = self.inner.create_database(request).await?;
407        // TODO: handle error in `resp.status` here
408        Ok(resp
409            .version
410            .ok_or_else(|| anyhow!("wait version not set"))?)
411    }
412
413    pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
414        let request = CreateSchemaRequest {
415            schema: Some(schema),
416        };
417        let resp = self.inner.create_schema(request).await?;
418        // TODO: handle error in `resp.status` here
419        Ok(resp
420            .version
421            .ok_or_else(|| anyhow!("wait version not set"))?)
422    }
423
424    pub async fn create_materialized_view(
425        &self,
426        table: PbTable,
427        graph: StreamFragmentGraph,
428        dependencies: HashSet<ObjectId>,
429        resource_type: streaming_job_resource_type::ResourceType,
430        if_not_exists: bool,
431    ) -> Result<WaitVersion> {
432        let request = CreateMaterializedViewRequest {
433            materialized_view: Some(table),
434            fragment_graph: Some(graph),
435            resource_type: Some(PbStreamingJobResourceType {
436                resource_type: Some(resource_type),
437            }),
438            dependencies: dependencies.into_iter().collect(),
439            if_not_exists,
440        };
441        let resp = self.inner.create_materialized_view(request).await?;
442        // TODO: handle error in `resp.status` here
443        Ok(resp
444            .version
445            .ok_or_else(|| anyhow!("wait version not set"))?)
446    }
447
448    pub async fn drop_materialized_view(
449        &self,
450        table_id: TableId,
451        cascade: bool,
452    ) -> Result<WaitVersion> {
453        let request = DropMaterializedViewRequest { table_id, cascade };
454
455        let resp = self.inner.drop_materialized_view(request).await?;
456        Ok(resp
457            .version
458            .ok_or_else(|| anyhow!("wait version not set"))?)
459    }
460
461    pub async fn create_source(
462        &self,
463        source: PbSource,
464        graph: Option<StreamFragmentGraph>,
465        if_not_exists: bool,
466    ) -> Result<WaitVersion> {
467        let request = CreateSourceRequest {
468            source: Some(source),
469            fragment_graph: graph,
470            if_not_exists,
471        };
472
473        let resp = self.inner.create_source(request).await?;
474        Ok(resp
475            .version
476            .ok_or_else(|| anyhow!("wait version not set"))?)
477    }
478
479    pub async fn create_sink(
480        &self,
481        sink: PbSink,
482        graph: StreamFragmentGraph,
483        dependencies: HashSet<ObjectId>,
484        if_not_exists: bool,
485    ) -> Result<WaitVersion> {
486        let request = CreateSinkRequest {
487            sink: Some(sink),
488            fragment_graph: Some(graph),
489            dependencies: dependencies.into_iter().collect(),
490            if_not_exists,
491        };
492
493        let resp = self.inner.create_sink(request).await?;
494        Ok(resp
495            .version
496            .ok_or_else(|| anyhow!("wait version not set"))?)
497    }
498
499    pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
500        let request = CreateSubscriptionRequest {
501            subscription: Some(subscription),
502        };
503
504        let resp = self.inner.create_subscription(request).await?;
505        Ok(resp
506            .version
507            .ok_or_else(|| anyhow!("wait version not set"))?)
508    }
509
510    pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
511        let request = CreateFunctionRequest {
512            function: Some(function),
513        };
514        let resp = self.inner.create_function(request).await?;
515        Ok(resp
516            .version
517            .ok_or_else(|| anyhow!("wait version not set"))?)
518    }
519
520    pub async fn create_table(
521        &self,
522        source: Option<PbSource>,
523        table: PbTable,
524        graph: StreamFragmentGraph,
525        job_type: PbTableJobType,
526        if_not_exists: bool,
527        dependencies: HashSet<ObjectId>,
528    ) -> Result<WaitVersion> {
529        let request = CreateTableRequest {
530            materialized_view: Some(table),
531            fragment_graph: Some(graph),
532            source,
533            job_type: job_type as _,
534            if_not_exists,
535            dependencies: dependencies.into_iter().collect(),
536        };
537        let resp = self.inner.create_table(request).await?;
538        // TODO: handle error in `resp.status` here
539        Ok(resp
540            .version
541            .ok_or_else(|| anyhow!("wait version not set"))?)
542    }
543
544    pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
545        let request = CommentOnRequest {
546            comment: Some(comment),
547        };
548        let resp = self.inner.comment_on(request).await?;
549        Ok(resp
550            .version
551            .ok_or_else(|| anyhow!("wait version not set"))?)
552    }
553
554    pub async fn alter_name(
555        &self,
556        object: alter_name_request::Object,
557        name: &str,
558    ) -> Result<WaitVersion> {
559        let request = AlterNameRequest {
560            object: Some(object),
561            new_name: name.to_owned(),
562        };
563        let resp = self.inner.alter_name(request).await?;
564        Ok(resp
565            .version
566            .ok_or_else(|| anyhow!("wait version not set"))?)
567    }
568
569    pub async fn alter_owner(&self, object: Object, owner_id: UserId) -> Result<WaitVersion> {
570        let request = AlterOwnerRequest {
571            object: Some(object),
572            owner_id,
573        };
574        let resp = self.inner.alter_owner(request).await?;
575        Ok(resp
576            .version
577            .ok_or_else(|| anyhow!("wait version not set"))?)
578    }
579
580    pub async fn alter_subscription_retention(
581        &self,
582        subscription_id: SubscriptionId,
583        retention_seconds: u64,
584        definition: String,
585    ) -> Result<WaitVersion> {
586        let request = AlterSubscriptionRetentionRequest {
587            subscription_id,
588            retention_seconds,
589            definition,
590        };
591        let resp = self.inner.alter_subscription_retention(request).await?;
592        Ok(resp
593            .version
594            .ok_or_else(|| anyhow!("wait version not set"))?)
595    }
596
597    pub async fn alter_database_param(
598        &self,
599        database_id: DatabaseId,
600        param: AlterDatabaseParam,
601    ) -> Result<WaitVersion> {
602        let request = match param {
603            AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms) => {
604                let barrier_interval_ms = OptionalUint32 {
605                    value: barrier_interval_ms,
606                };
607                AlterDatabaseParamRequest {
608                    database_id,
609                    param: Some(alter_database_param_request::Param::BarrierIntervalMs(
610                        barrier_interval_ms,
611                    )),
612                }
613            }
614            AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency) => {
615                let checkpoint_frequency = OptionalUint64 {
616                    value: checkpoint_frequency,
617                };
618                AlterDatabaseParamRequest {
619                    database_id,
620                    param: Some(alter_database_param_request::Param::CheckpointFrequency(
621                        checkpoint_frequency,
622                    )),
623                }
624            }
625        };
626        let resp = self.inner.alter_database_param(request).await?;
627        Ok(resp
628            .version
629            .ok_or_else(|| anyhow!("wait version not set"))?)
630    }
631
632    pub async fn alter_set_schema(
633        &self,
634        object: alter_set_schema_request::Object,
635        new_schema_id: SchemaId,
636    ) -> Result<WaitVersion> {
637        let request = AlterSetSchemaRequest {
638            new_schema_id,
639            object: Some(object),
640        };
641        let resp = self.inner.alter_set_schema(request).await?;
642        Ok(resp
643            .version
644            .ok_or_else(|| anyhow!("wait version not set"))?)
645    }
646
647    pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
648        let request = AlterSourceRequest {
649            source: Some(source),
650        };
651        let resp = self.inner.alter_source(request).await?;
652        Ok(resp
653            .version
654            .ok_or_else(|| anyhow!("wait version not set"))?)
655    }
656
657    pub async fn alter_parallelism(
658        &self,
659        job_id: JobId,
660        parallelism: PbTableParallelism,
661        deferred: bool,
662    ) -> Result<()> {
663        let request = AlterParallelismRequest {
664            table_id: job_id,
665            parallelism: Some(parallelism),
666            deferred,
667        };
668
669        self.inner.alter_parallelism(request).await?;
670        Ok(())
671    }
672
673    pub async fn alter_backfill_parallelism(
674        &self,
675        job_id: JobId,
676        parallelism: Option<PbTableParallelism>,
677        deferred: bool,
678    ) -> Result<()> {
679        let request = AlterBackfillParallelismRequest {
680            table_id: job_id,
681            parallelism,
682            deferred,
683        };
684
685        self.inner.alter_backfill_parallelism(request).await?;
686        Ok(())
687    }
688
689    pub async fn alter_streaming_job_config(
690        &self,
691        job_id: JobId,
692        entries_to_add: HashMap<String, String>,
693        keys_to_remove: Vec<String>,
694    ) -> Result<()> {
695        let request = AlterStreamingJobConfigRequest {
696            job_id,
697            entries_to_add,
698            keys_to_remove,
699        };
700
701        self.inner.alter_streaming_job_config(request).await?;
702        Ok(())
703    }
704
705    pub async fn alter_fragment_parallelism(
706        &self,
707        fragment_ids: Vec<FragmentId>,
708        parallelism: Option<PbTableParallelism>,
709    ) -> Result<()> {
710        let request = AlterFragmentParallelismRequest {
711            fragment_ids,
712            parallelism,
713        };
714
715        self.inner.alter_fragment_parallelism(request).await?;
716        Ok(())
717    }
718
719    pub async fn alter_cdc_table_backfill_parallelism(
720        &self,
721        table_id: JobId,
722        parallelism: PbTableParallelism,
723    ) -> Result<()> {
724        let request = AlterCdcTableBackfillParallelismRequest {
725            table_id,
726            parallelism: Some(parallelism),
727        };
728        self.inner
729            .alter_cdc_table_backfill_parallelism(request)
730            .await?;
731        Ok(())
732    }
733
734    pub async fn alter_resource_group(
735        &self,
736        table_id: TableId,
737        resource_group: Option<String>,
738        deferred: bool,
739    ) -> Result<()> {
740        let request = AlterResourceGroupRequest {
741            table_id,
742            resource_group,
743            deferred,
744        };
745
746        self.inner.alter_resource_group(request).await?;
747        Ok(())
748    }
749
750    pub async fn alter_swap_rename(
751        &self,
752        object: alter_swap_rename_request::Object,
753    ) -> Result<WaitVersion> {
754        let request = AlterSwapRenameRequest {
755            object: Some(object),
756        };
757        let resp = self.inner.alter_swap_rename(request).await?;
758        Ok(resp
759            .version
760            .ok_or_else(|| anyhow!("wait version not set"))?)
761    }
762
763    pub async fn alter_secret(
764        &self,
765        secret_id: SecretId,
766        secret_name: String,
767        database_id: DatabaseId,
768        schema_id: SchemaId,
769        owner_id: UserId,
770        value: Vec<u8>,
771    ) -> Result<WaitVersion> {
772        let request = AlterSecretRequest {
773            secret_id,
774            name: secret_name,
775            database_id,
776            schema_id,
777            owner_id,
778            value,
779        };
780        let resp = self.inner.alter_secret(request).await?;
781        Ok(resp
782            .version
783            .ok_or_else(|| anyhow!("wait version not set"))?)
784    }
785
786    pub async fn replace_job(
787        &self,
788        graph: StreamFragmentGraph,
789        replace_job: ReplaceJob,
790    ) -> Result<WaitVersion> {
791        let request = ReplaceJobPlanRequest {
792            plan: Some(ReplaceJobPlan {
793                fragment_graph: Some(graph),
794                replace_job: Some(replace_job),
795            }),
796        };
797        let resp = self.inner.replace_job_plan(request).await?;
798        // TODO: handle error in `resp.status` here
799        Ok(resp
800            .version
801            .ok_or_else(|| anyhow!("wait version not set"))?)
802    }
803
804    pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
805        let request = AutoSchemaChangeRequest {
806            schema_change: Some(schema_change),
807        };
808        let _ = self.inner.auto_schema_change(request).await?;
809        Ok(())
810    }
811
812    pub async fn create_view(
813        &self,
814        view: PbView,
815        dependencies: HashSet<ObjectId>,
816    ) -> Result<WaitVersion> {
817        let request = CreateViewRequest {
818            view: Some(view),
819            dependencies: dependencies.into_iter().collect(),
820        };
821        let resp = self.inner.create_view(request).await?;
822        // TODO: handle error in `resp.status` here
823        Ok(resp
824            .version
825            .ok_or_else(|| anyhow!("wait version not set"))?)
826    }
827
828    pub async fn create_index(
829        &self,
830        index: PbIndex,
831        table: PbTable,
832        graph: StreamFragmentGraph,
833        if_not_exists: bool,
834    ) -> Result<WaitVersion> {
835        let request = CreateIndexRequest {
836            index: Some(index),
837            index_table: Some(table),
838            fragment_graph: Some(graph),
839            if_not_exists,
840        };
841        let resp = self.inner.create_index(request).await?;
842        // TODO: handle error in `resp.status` here
843        Ok(resp
844            .version
845            .ok_or_else(|| anyhow!("wait version not set"))?)
846    }
847
848    pub async fn drop_table(
849        &self,
850        source_id: Option<SourceId>,
851        table_id: TableId,
852        cascade: bool,
853    ) -> Result<WaitVersion> {
854        let request = DropTableRequest {
855            source_id: source_id.map(risingwave_pb::ddl_service::drop_table_request::SourceId::Id),
856            table_id,
857            cascade,
858        };
859
860        let resp = self.inner.drop_table(request).await?;
861        Ok(resp
862            .version
863            .ok_or_else(|| anyhow!("wait version not set"))?)
864    }
865
866    pub async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
867        let request = CompactIcebergTableRequest { sink_id };
868        let resp = self.inner.compact_iceberg_table(request).await?;
869        Ok(resp.task_id)
870    }
871
872    pub async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
873        let request = ExpireIcebergTableSnapshotsRequest { sink_id };
874        let _resp = self.inner.expire_iceberg_table_snapshots(request).await?;
875        Ok(())
876    }
877
878    pub async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<WaitVersion> {
879        let request = DropViewRequest { view_id, cascade };
880        let resp = self.inner.drop_view(request).await?;
881        Ok(resp
882            .version
883            .ok_or_else(|| anyhow!("wait version not set"))?)
884    }
885
886    pub async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<WaitVersion> {
887        let request = DropSourceRequest { source_id, cascade };
888        let resp = self.inner.drop_source(request).await?;
889        Ok(resp
890            .version
891            .ok_or_else(|| anyhow!("wait version not set"))?)
892    }
893
894    pub async fn reset_source(&self, source_id: SourceId) -> Result<WaitVersion> {
895        let request = ResetSourceRequest { source_id };
896        let resp = self.inner.reset_source(request).await?;
897        Ok(resp
898            .version
899            .ok_or_else(|| anyhow!("wait version not set"))?)
900    }
901
902    pub async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<WaitVersion> {
903        let request = DropSinkRequest { sink_id, cascade };
904        let resp = self.inner.drop_sink(request).await?;
905        Ok(resp
906            .version
907            .ok_or_else(|| anyhow!("wait version not set"))?)
908    }
909
910    pub async fn drop_subscription(
911        &self,
912        subscription_id: SubscriptionId,
913        cascade: bool,
914    ) -> Result<WaitVersion> {
915        let request = DropSubscriptionRequest {
916            subscription_id,
917            cascade,
918        };
919        let resp = self.inner.drop_subscription(request).await?;
920        Ok(resp
921            .version
922            .ok_or_else(|| anyhow!("wait version not set"))?)
923    }
924
925    pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
926        let request = DropIndexRequest { index_id, cascade };
927        let resp = self.inner.drop_index(request).await?;
928        Ok(resp
929            .version
930            .ok_or_else(|| anyhow!("wait version not set"))?)
931    }
932
933    pub async fn drop_function(
934        &self,
935        function_id: FunctionId,
936        cascade: bool,
937    ) -> Result<WaitVersion> {
938        let request = DropFunctionRequest {
939            function_id,
940            cascade,
941        };
942        let resp = self.inner.drop_function(request).await?;
943        Ok(resp
944            .version
945            .ok_or_else(|| anyhow!("wait version not set"))?)
946    }
947
948    pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
949        let request = DropDatabaseRequest { database_id };
950        let resp = self.inner.drop_database(request).await?;
951        Ok(resp
952            .version
953            .ok_or_else(|| anyhow!("wait version not set"))?)
954    }
955
956    pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
957        let request = DropSchemaRequest { schema_id, cascade };
958        let resp = self.inner.drop_schema(request).await?;
959        Ok(resp
960            .version
961            .ok_or_else(|| anyhow!("wait version not set"))?)
962    }
963
964    // TODO: using UserInfoVersion instead as return type.
965    pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
966        let request = CreateUserRequest { user: Some(user) };
967        let resp = self.inner.create_user(request).await?;
968        Ok(resp.version)
969    }
970
971    pub async fn drop_user(&self, user_id: UserId) -> Result<u64> {
972        let request = DropUserRequest { user_id };
973        let resp = self.inner.drop_user(request).await?;
974        Ok(resp.version)
975    }
976
977    pub async fn update_user(
978        &self,
979        user: UserInfo,
980        update_fields: Vec<UpdateField>,
981    ) -> Result<u64> {
982        let request = UpdateUserRequest {
983            user: Some(user),
984            update_fields: update_fields
985                .into_iter()
986                .map(|field| field as i32)
987                .collect::<Vec<_>>(),
988        };
989        let resp = self.inner.update_user(request).await?;
990        Ok(resp.version)
991    }
992
993    pub async fn grant_privilege(
994        &self,
995        user_ids: Vec<UserId>,
996        privileges: Vec<GrantPrivilege>,
997        with_grant_option: bool,
998        granted_by: UserId,
999    ) -> Result<u64> {
1000        let request = GrantPrivilegeRequest {
1001            user_ids,
1002            privileges,
1003            with_grant_option,
1004            granted_by,
1005        };
1006        let resp = self.inner.grant_privilege(request).await?;
1007        Ok(resp.version)
1008    }
1009
1010    pub async fn revoke_privilege(
1011        &self,
1012        user_ids: Vec<UserId>,
1013        privileges: Vec<GrantPrivilege>,
1014        granted_by: UserId,
1015        revoke_by: UserId,
1016        revoke_grant_option: bool,
1017        cascade: bool,
1018    ) -> Result<u64> {
1019        let request = RevokePrivilegeRequest {
1020            user_ids,
1021            privileges,
1022            granted_by,
1023            revoke_by,
1024            revoke_grant_option,
1025            cascade,
1026        };
1027        let resp = self.inner.revoke_privilege(request).await?;
1028        Ok(resp.version)
1029    }
1030
1031    pub async fn alter_default_privilege(
1032        &self,
1033        user_ids: Vec<UserId>,
1034        database_id: DatabaseId,
1035        schema_ids: Vec<SchemaId>,
1036        operation: AlterDefaultPrivilegeOperation,
1037        granted_by: UserId,
1038    ) -> Result<()> {
1039        let request = AlterDefaultPrivilegeRequest {
1040            user_ids,
1041            database_id,
1042            schema_ids,
1043            operation: Some(operation),
1044            granted_by,
1045        };
1046        self.inner.alter_default_privilege(request).await?;
1047        Ok(())
1048    }
1049
1050    /// Unregister the current node from the cluster.
1051    pub async fn unregister(&self) -> Result<()> {
1052        let request = DeleteWorkerNodeRequest {
1053            host: Some(self.host_addr.to_protobuf()),
1054        };
1055        self.inner.delete_worker_node(request).await?;
1056        self.shutting_down.store(true, Relaxed);
1057        Ok(())
1058    }
1059
1060    /// Try to unregister the current worker from the cluster with best effort. Log the result.
1061    pub async fn try_unregister(&self) {
1062        match self.unregister().await {
1063            Ok(_) => {
1064                tracing::info!(
1065                    worker_id = %self.worker_id(),
1066                    "successfully unregistered from meta service",
1067                )
1068            }
1069            Err(e) => {
1070                tracing::warn!(
1071                    error = %e.as_report(),
1072                    worker_id = %self.worker_id(),
1073                    "failed to unregister from meta service",
1074                );
1075            }
1076        }
1077    }
1078
1079    pub async fn update_schedulability(
1080        &self,
1081        worker_ids: &[WorkerId],
1082        schedulability: Schedulability,
1083    ) -> Result<UpdateWorkerNodeSchedulabilityResponse> {
1084        let request = UpdateWorkerNodeSchedulabilityRequest {
1085            worker_ids: worker_ids.to_vec(),
1086            schedulability: schedulability.into(),
1087        };
1088        let resp = self
1089            .inner
1090            .update_worker_node_schedulability(request)
1091            .await?;
1092        Ok(resp)
1093    }
1094
1095    pub async fn list_worker_nodes(
1096        &self,
1097        worker_type: Option<WorkerType>,
1098    ) -> Result<Vec<WorkerNode>> {
1099        let request = ListAllNodesRequest {
1100            worker_type: worker_type.map(Into::into),
1101            include_starting_nodes: true,
1102        };
1103        let resp = self.inner.list_all_nodes(request).await?;
1104        Ok(resp.nodes)
1105    }
1106
1107    /// Starts a heartbeat worker.
1108    pub fn start_heartbeat_loop(
1109        meta_client: MetaClient,
1110        min_interval: Duration,
1111    ) -> (JoinHandle<()>, Sender<()>) {
1112        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1113        let join_handle = tokio::spawn(async move {
1114            let mut min_interval_ticker = tokio::time::interval(min_interval);
1115            loop {
1116                tokio::select! {
1117                    biased;
1118                    // Shutdown
1119                    _ = &mut shutdown_rx => {
1120                        tracing::info!("Heartbeat loop is stopped");
1121                        return;
1122                    }
1123                    // Wait for interval
1124                    _ = min_interval_ticker.tick() => {},
1125                }
1126                tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
1127                match tokio::time::timeout(
1128                    // TODO: decide better min_interval for timeout
1129                    min_interval * 3,
1130                    meta_client.send_heartbeat(),
1131                )
1132                .await
1133                {
1134                    Ok(Ok(_)) => {}
1135                    Ok(Err(err)) => {
1136                        tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
1137                    }
1138                    Err(_) => {
1139                        tracing::warn!("Failed to send_heartbeat: timeout");
1140                    }
1141                }
1142            }
1143        });
1144        (join_handle, shutdown_tx)
1145    }
1146
1147    pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
1148        let request = RisectlListStateTablesRequest {};
1149        let resp = self.inner.risectl_list_state_tables(request).await?;
1150        Ok(resp.tables)
1151    }
1152
1153    pub async fn risectl_resume_backfill(
1154        &self,
1155        request: RisectlResumeBackfillRequest,
1156    ) -> Result<()> {
1157        self.inner.risectl_resume_backfill(request).await?;
1158        Ok(())
1159    }
1160
1161    pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
1162        let request = FlushRequest { database_id };
1163        let resp = self.inner.flush(request).await?;
1164        Ok(resp.hummock_version_id)
1165    }
1166
1167    pub async fn wait(&self) -> Result<WaitVersion> {
1168        let request = WaitRequest {};
1169        let resp = self.inner.wait(request).await?;
1170        Ok(resp
1171            .version
1172            .ok_or_else(|| anyhow!("wait version not set"))?)
1173    }
1174
1175    pub async fn recover(&self) -> Result<()> {
1176        let request = RecoverRequest {};
1177        self.inner.recover(request).await?;
1178        Ok(())
1179    }
1180
1181    pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
1182        let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
1183        let resp = self.inner.cancel_creating_jobs(request).await?;
1184        Ok(resp.canceled_jobs)
1185    }
1186
1187    pub async fn list_table_fragments(
1188        &self,
1189        job_ids: &[JobId],
1190    ) -> Result<HashMap<JobId, TableFragmentInfo>> {
1191        let request = ListTableFragmentsRequest {
1192            table_ids: job_ids.to_vec(),
1193        };
1194        let resp = self.inner.list_table_fragments(request).await?;
1195        Ok(resp.table_fragments)
1196    }
1197
1198    pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1199        let resp = self
1200            .inner
1201            .list_streaming_job_states(ListStreamingJobStatesRequest {})
1202            .await?;
1203        Ok(resp.states)
1204    }
1205
1206    pub async fn list_fragment_distributions(
1207        &self,
1208        include_node: bool,
1209    ) -> Result<Vec<FragmentDistribution>> {
1210        let resp = self
1211            .inner
1212            .list_fragment_distribution(ListFragmentDistributionRequest {
1213                include_node: Some(include_node),
1214            })
1215            .await?;
1216        Ok(resp.distributions)
1217    }
1218
1219    pub async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
1220        let resp = self
1221            .inner
1222            .list_creating_fragment_distribution(ListCreatingFragmentDistributionRequest {
1223                include_node: Some(true),
1224            })
1225            .await?;
1226        Ok(resp.distributions)
1227    }
1228
1229    pub async fn get_fragment_by_id(
1230        &self,
1231        fragment_id: FragmentId,
1232    ) -> Result<Option<FragmentDistribution>> {
1233        let resp = self
1234            .inner
1235            .get_fragment_by_id(GetFragmentByIdRequest { fragment_id })
1236            .await?;
1237        Ok(resp.distribution)
1238    }
1239
1240    pub async fn get_fragment_vnodes(
1241        &self,
1242        fragment_id: FragmentId,
1243    ) -> Result<Vec<(ActorId, Vec<u32>)>> {
1244        let resp = self
1245            .inner
1246            .get_fragment_vnodes(GetFragmentVnodesRequest { fragment_id })
1247            .await?;
1248        Ok(resp
1249            .actor_vnodes
1250            .into_iter()
1251            .map(|actor| (actor.actor_id, actor.vnode_indices))
1252            .collect())
1253    }
1254
1255    pub async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>> {
1256        let resp = self
1257            .inner
1258            .get_actor_vnodes(GetActorVnodesRequest { actor_id })
1259            .await?;
1260        Ok(resp.vnode_indices)
1261    }
1262
1263    pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1264        let resp = self
1265            .inner
1266            .list_actor_states(ListActorStatesRequest {})
1267            .await?;
1268        Ok(resp.states)
1269    }
1270
1271    pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1272        let resp = self
1273            .inner
1274            .list_actor_splits(ListActorSplitsRequest {})
1275            .await?;
1276
1277        Ok(resp.actor_splits)
1278    }
1279
1280    pub async fn pause(&self) -> Result<PauseResponse> {
1281        let request = PauseRequest {};
1282        let resp = self.inner.pause(request).await?;
1283        Ok(resp)
1284    }
1285
1286    pub async fn resume(&self) -> Result<ResumeResponse> {
1287        let request = ResumeRequest {};
1288        let resp = self.inner.resume(request).await?;
1289        Ok(resp)
1290    }
1291
1292    pub async fn apply_throttle(
1293        &self,
1294        throttle_target: PbThrottleTarget,
1295        throttle_type: risingwave_pb::common::PbThrottleType,
1296        id: u32,
1297        rate: Option<u32>,
1298    ) -> Result<ApplyThrottleResponse> {
1299        let request = ApplyThrottleRequest {
1300            throttle_target: throttle_target as i32,
1301            throttle_type: throttle_type as i32,
1302            id,
1303            rate,
1304        };
1305        let resp = self.inner.apply_throttle(request).await?;
1306        Ok(resp)
1307    }
1308
1309    pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1310        let resp = self
1311            .inner
1312            .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1313            .await?;
1314        Ok(resp.get_status().unwrap())
1315    }
1316
1317    pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1318        let request = GetClusterInfoRequest {};
1319        let resp = self.inner.get_cluster_info(request).await?;
1320        Ok(resp)
1321    }
1322
1323    pub async fn reschedule(
1324        &self,
1325        worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1326        revision: u64,
1327        resolve_no_shuffle_upstream: bool,
1328    ) -> Result<(bool, u64)> {
1329        let request = RescheduleRequest {
1330            revision,
1331            resolve_no_shuffle_upstream,
1332            worker_reschedules,
1333        };
1334        let resp = self.inner.reschedule(request).await?;
1335        Ok((resp.success, resp.revision))
1336    }
1337
1338    pub async fn risectl_get_pinned_versions_summary(
1339        &self,
1340    ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1341        let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1342        self.inner
1343            .rise_ctl_get_pinned_versions_summary(request)
1344            .await
1345    }
1346
1347    pub async fn risectl_get_checkpoint_hummock_version(
1348        &self,
1349    ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1350        let request = RiseCtlGetCheckpointVersionRequest {};
1351        self.inner.rise_ctl_get_checkpoint_version(request).await
1352    }
1353
1354    pub async fn risectl_pause_hummock_version_checkpoint(
1355        &self,
1356    ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1357        let request = RiseCtlPauseVersionCheckpointRequest {};
1358        self.inner.rise_ctl_pause_version_checkpoint(request).await
1359    }
1360
1361    pub async fn risectl_resume_hummock_version_checkpoint(
1362        &self,
1363    ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1364        let request = RiseCtlResumeVersionCheckpointRequest {};
1365        self.inner.rise_ctl_resume_version_checkpoint(request).await
1366    }
1367
1368    pub async fn init_metadata_for_replay(
1369        &self,
1370        tables: Vec<PbTable>,
1371        compaction_groups: Vec<CompactionGroupInfo>,
1372    ) -> Result<()> {
1373        let req = InitMetadataForReplayRequest {
1374            tables,
1375            compaction_groups,
1376        };
1377        let _resp = self.inner.init_metadata_for_replay(req).await?;
1378        Ok(())
1379    }
1380
1381    pub async fn replay_version_delta(
1382        &self,
1383        version_delta: HummockVersionDelta,
1384    ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1385        let req = ReplayVersionDeltaRequest {
1386            version_delta: Some(version_delta.into()),
1387        };
1388        let resp = self.inner.replay_version_delta(req).await?;
1389        Ok((
1390            HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1391            resp.modified_compaction_groups,
1392        ))
1393    }
1394
1395    pub async fn list_version_deltas(
1396        &self,
1397        start_id: HummockVersionId,
1398        num_limit: u32,
1399        committed_epoch_limit: HummockEpoch,
1400    ) -> Result<Vec<HummockVersionDelta>> {
1401        let req = ListVersionDeltasRequest {
1402            start_id,
1403            num_limit,
1404            committed_epoch_limit,
1405        };
1406        Ok(self
1407            .inner
1408            .list_version_deltas(req)
1409            .await?
1410            .version_deltas
1411            .unwrap()
1412            .version_deltas
1413            .iter()
1414            .map(HummockVersionDelta::from_rpc_protobuf)
1415            .collect())
1416    }
1417
1418    pub async fn trigger_compaction_deterministic(
1419        &self,
1420        version_id: HummockVersionId,
1421        compaction_groups: Vec<CompactionGroupId>,
1422    ) -> Result<()> {
1423        let req = TriggerCompactionDeterministicRequest {
1424            version_id,
1425            compaction_groups,
1426        };
1427        self.inner.trigger_compaction_deterministic(req).await?;
1428        Ok(())
1429    }
1430
1431    pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1432        let req = DisableCommitEpochRequest {};
1433        Ok(HummockVersion::from_rpc_protobuf(
1434            &self
1435                .inner
1436                .disable_commit_epoch(req)
1437                .await?
1438                .current_version
1439                .unwrap(),
1440        ))
1441    }
1442
1443    pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1444        let req = GetAssignedCompactTaskNumRequest {};
1445        let resp = self.inner.get_assigned_compact_task_num(req).await?;
1446        Ok(resp.num_tasks as usize)
1447    }
1448
1449    pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1450        let req = RiseCtlListCompactionGroupRequest {};
1451        let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1452        Ok(resp.compaction_groups)
1453    }
1454
1455    pub async fn risectl_update_compaction_config(
1456        &self,
1457        compaction_groups: &[CompactionGroupId],
1458        configs: &[MutableConfig],
1459    ) -> Result<()> {
1460        let req = RiseCtlUpdateCompactionConfigRequest {
1461            compaction_group_ids: compaction_groups.to_vec(),
1462            configs: configs
1463                .iter()
1464                .map(
1465                    |c| rise_ctl_update_compaction_config_request::MutableConfig {
1466                        mutable_config: Some(c.clone()),
1467                    },
1468                )
1469                .collect(),
1470        };
1471        let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1472        Ok(())
1473    }
1474
1475    pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1476        let req = BackupMetaRequest { remarks };
1477        let resp = self.inner.backup_meta(req).await?;
1478        Ok(resp.job_id)
1479    }
1480
1481    pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1482        let req = GetBackupJobStatusRequest { job_id };
1483        let resp = self.inner.get_backup_job_status(req).await?;
1484        Ok((resp.job_status(), resp.message))
1485    }
1486
1487    pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1488        let req = DeleteMetaSnapshotRequest {
1489            snapshot_ids: snapshot_ids.to_vec(),
1490        };
1491        let _resp = self.inner.delete_meta_snapshot(req).await?;
1492        Ok(())
1493    }
1494
1495    pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1496        let req = GetMetaSnapshotManifestRequest {};
1497        let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1498        Ok(resp.manifest.expect("should exist"))
1499    }
1500
1501    pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1502        let req = GetTelemetryInfoRequest {};
1503        let resp = self.inner.get_telemetry_info(req).await?;
1504        Ok(resp)
1505    }
1506
1507    pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1508        let req = GetMetaStoreInfoRequest {};
1509        let resp = self.inner.get_meta_store_info(req).await?;
1510        Ok(resp.meta_store_endpoint)
1511    }
1512
1513    pub async fn alter_sink_props(
1514        &self,
1515        sink_id: SinkId,
1516        changed_props: BTreeMap<String, String>,
1517        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1518        connector_conn_ref: Option<ConnectionId>,
1519    ) -> Result<()> {
1520        let req = AlterConnectorPropsRequest {
1521            object_id: sink_id.as_raw_id(),
1522            changed_props: changed_props.into_iter().collect(),
1523            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1524            connector_conn_ref,
1525            object_type: AlterConnectorPropsObject::Sink as i32,
1526            extra_options: None,
1527        };
1528        let _resp = self.inner.alter_connector_props(req).await?;
1529        Ok(())
1530    }
1531
1532    pub async fn alter_iceberg_table_props(
1533        &self,
1534        table_id: TableId,
1535        sink_id: SinkId,
1536        source_id: SourceId,
1537        changed_props: BTreeMap<String, String>,
1538        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1539        connector_conn_ref: Option<ConnectionId>,
1540    ) -> Result<()> {
1541        let req = AlterConnectorPropsRequest {
1542            object_id: table_id.as_raw_id(),
1543            changed_props: changed_props.into_iter().collect(),
1544            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1545            connector_conn_ref,
1546            object_type: AlterConnectorPropsObject::IcebergTable as i32,
1547            extra_options: Some(ExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds {
1548                sink_id,
1549                source_id,
1550            })),
1551        };
1552        let _resp = self.inner.alter_connector_props(req).await?;
1553        Ok(())
1554    }
1555
1556    pub async fn alter_source_connector_props(
1557        &self,
1558        source_id: SourceId,
1559        changed_props: BTreeMap<String, String>,
1560        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1561        connector_conn_ref: Option<ConnectionId>,
1562    ) -> Result<()> {
1563        let req = AlterConnectorPropsRequest {
1564            object_id: source_id.as_raw_id(),
1565            changed_props: changed_props.into_iter().collect(),
1566            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1567            connector_conn_ref,
1568            object_type: AlterConnectorPropsObject::Source as i32,
1569            extra_options: None,
1570        };
1571        let _resp = self.inner.alter_connector_props(req).await?;
1572        Ok(())
1573    }
1574
1575    pub async fn alter_connection_connector_props(
1576        &self,
1577        connection_id: u32,
1578        changed_props: BTreeMap<String, String>,
1579        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1580    ) -> Result<()> {
1581        let req = AlterConnectorPropsRequest {
1582            object_id: connection_id,
1583            changed_props: changed_props.into_iter().collect(),
1584            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1585            connector_conn_ref: None, // Connections don't reference other connections
1586            object_type: AlterConnectorPropsObject::Connection as i32,
1587            extra_options: None,
1588        };
1589        let _resp = self.inner.alter_connector_props(req).await?;
1590        Ok(())
1591    }
1592
1593    /// Orchestrated source property update with pause/update/resume workflow.
1594    /// This is the "safe" version that pauses sources before updating and resumes after.
1595    pub async fn alter_source_properties_safe(
1596        &self,
1597        source_id: SourceId,
1598        changed_props: BTreeMap<String, String>,
1599        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1600        reset_splits: bool,
1601    ) -> Result<()> {
1602        let req = AlterSourcePropertiesSafeRequest {
1603            source_id: source_id.as_raw_id(),
1604            changed_props: changed_props.into_iter().collect(),
1605            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1606            options: Some(PropertyUpdateOptions { reset_splits }),
1607        };
1608        let _resp = self.inner.alter_source_properties_safe(req).await?;
1609        Ok(())
1610    }
1611
1612    /// Reset source split assignments (UNSAFE - admin only).
1613    /// This clears persisted split metadata and triggers re-discovery.
1614    pub async fn reset_source_splits(&self, source_id: SourceId) -> Result<()> {
1615        let req = ResetSourceSplitsRequest {
1616            source_id: source_id.as_raw_id(),
1617        };
1618        let _resp = self.inner.reset_source_splits(req).await?;
1619        Ok(())
1620    }
1621
1622    /// Inject specific offsets into source splits (UNSAFE - admin only).
1623    /// This can cause data duplication or loss depending on the correctness of the provided offsets.
1624    pub async fn inject_source_offsets(
1625        &self,
1626        source_id: SourceId,
1627        split_offsets: HashMap<String, String>,
1628    ) -> Result<Vec<String>> {
1629        let req = InjectSourceOffsetsRequest {
1630            source_id: source_id.as_raw_id(),
1631            split_offsets,
1632        };
1633        let resp = self.inner.inject_source_offsets(req).await?;
1634        Ok(resp.applied_split_ids)
1635    }
1636
1637    pub async fn set_system_param(
1638        &self,
1639        param: String,
1640        value: Option<String>,
1641    ) -> Result<Option<SystemParamsReader>> {
1642        let req = SetSystemParamRequest { param, value };
1643        let resp = self.inner.set_system_param(req).await?;
1644        Ok(resp.params.map(SystemParamsReader::from))
1645    }
1646
1647    pub async fn get_session_params(&self) -> Result<String> {
1648        let req = GetSessionParamsRequest {};
1649        let resp = self.inner.get_session_params(req).await?;
1650        Ok(resp.params)
1651    }
1652
1653    pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1654        let req = SetSessionParamRequest { param, value };
1655        let resp = self.inner.set_session_param(req).await?;
1656        Ok(resp.param)
1657    }
1658
1659    pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1660        let req = GetDdlProgressRequest {};
1661        let resp = self.inner.get_ddl_progress(req).await?;
1662        Ok(resp.ddl_progress)
1663    }
1664
1665    pub async fn split_compaction_group(
1666        &self,
1667        group_id: CompactionGroupId,
1668        table_ids_to_new_group: &[TableId],
1669        partition_vnode_count: u32,
1670    ) -> Result<CompactionGroupId> {
1671        let req = SplitCompactionGroupRequest {
1672            group_id,
1673            table_ids: table_ids_to_new_group.to_vec(),
1674            partition_vnode_count,
1675        };
1676        let resp = self.inner.split_compaction_group(req).await?;
1677        Ok(resp.new_group_id)
1678    }
1679
1680    pub async fn get_tables(
1681        &self,
1682        table_ids: Vec<TableId>,
1683        include_dropped_tables: bool,
1684    ) -> Result<HashMap<TableId, Table>> {
1685        let req = GetTablesRequest {
1686            table_ids,
1687            include_dropped_tables,
1688        };
1689        let resp = self.inner.get_tables(req).await?;
1690        Ok(resp.tables)
1691    }
1692
1693    pub async fn list_serving_vnode_mappings(
1694        &self,
1695    ) -> Result<HashMap<FragmentId, (JobId, WorkerSlotMapping)>> {
1696        let req = GetServingVnodeMappingsRequest {};
1697        let resp = self.inner.get_serving_vnode_mappings(req).await?;
1698        let mappings = resp
1699            .worker_slot_mappings
1700            .into_iter()
1701            .map(|p| {
1702                (
1703                    p.fragment_id,
1704                    (
1705                        resp.fragment_to_table
1706                            .get(&p.fragment_id)
1707                            .cloned()
1708                            .unwrap_or_default(),
1709                        WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1710                    ),
1711                )
1712            })
1713            .collect();
1714        Ok(mappings)
1715    }
1716
1717    pub async fn risectl_list_compaction_status(
1718        &self,
1719    ) -> Result<(
1720        Vec<CompactStatus>,
1721        Vec<CompactTaskAssignment>,
1722        Vec<CompactTaskProgress>,
1723    )> {
1724        let req = RiseCtlListCompactionStatusRequest {};
1725        let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1726        Ok((
1727            resp.compaction_statuses,
1728            resp.task_assignment,
1729            resp.task_progress,
1730        ))
1731    }
1732
1733    pub async fn get_compaction_score(
1734        &self,
1735        compaction_group_id: CompactionGroupId,
1736    ) -> Result<Vec<PickerInfo>> {
1737        let req = GetCompactionScoreRequest {
1738            compaction_group_id,
1739        };
1740        let resp = self.inner.get_compaction_score(req).await?;
1741        Ok(resp.scores)
1742    }
1743
1744    pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1745        let req = RiseCtlRebuildTableStatsRequest {};
1746        let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1747        Ok(())
1748    }
1749
1750    pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1751        let req = ListBranchedObjectRequest {};
1752        let resp = self.inner.list_branched_object(req).await?;
1753        Ok(resp.branched_objects)
1754    }
1755
1756    pub async fn list_active_write_limit(&self) -> Result<HashMap<CompactionGroupId, WriteLimit>> {
1757        let req = ListActiveWriteLimitRequest {};
1758        let resp = self.inner.list_active_write_limit(req).await?;
1759        Ok(resp.write_limits)
1760    }
1761
1762    pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1763        let req = ListHummockMetaConfigRequest {};
1764        let resp = self.inner.list_hummock_meta_config(req).await?;
1765        Ok(resp.configs)
1766    }
1767
1768    pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1769        let _resp = self
1770            .inner
1771            .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1772            .await?;
1773
1774        Ok(())
1775    }
1776
1777    pub async fn rw_cloud_validate_source(
1778        &self,
1779        source_type: SourceType,
1780        source_config: HashMap<String, String>,
1781    ) -> Result<RwCloudValidateSourceResponse> {
1782        let req = RwCloudValidateSourceRequest {
1783            source_type: source_type.into(),
1784            source_config,
1785        };
1786        let resp = self.inner.rw_cloud_validate_source(req).await?;
1787        Ok(resp)
1788    }
1789
1790    pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1791        self.inner.core.read().await.sink_coordinate_client.clone()
1792    }
1793
1794    pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1795        let req = ListCompactTaskAssignmentRequest {};
1796        let resp = self.inner.list_compact_task_assignment(req).await?;
1797        Ok(resp.task_assignment)
1798    }
1799
1800    pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1801        let req = ListEventLogRequest::default();
1802        let resp = self.inner.list_event_log(req).await?;
1803        Ok(resp.event_logs)
1804    }
1805
1806    pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1807        let req = ListCompactTaskProgressRequest {};
1808        let resp = self.inner.list_compact_task_progress(req).await?;
1809        Ok(resp.task_progress)
1810    }
1811
1812    #[cfg(madsim)]
1813    pub fn try_add_panic_event_blocking(
1814        &self,
1815        panic_info: impl Display,
1816        timeout_millis: Option<u64>,
1817    ) {
1818    }
1819
1820    /// If `timeout_millis` is None, default is used.
1821    #[cfg(not(madsim))]
1822    pub fn try_add_panic_event_blocking(
1823        &self,
1824        panic_info: impl Display,
1825        timeout_millis: Option<u64>,
1826    ) {
1827        let event = event_log::EventWorkerNodePanic {
1828            worker_id: self.worker_id,
1829            worker_type: self.worker_type.into(),
1830            host_addr: Some(self.host_addr.to_protobuf()),
1831            panic_info: format!("{panic_info}"),
1832        };
1833        let grpc_meta_client = self.inner.clone();
1834        let _ = thread::spawn(move || {
1835            let rt = tokio::runtime::Builder::new_current_thread()
1836                .enable_all()
1837                .build()
1838                .unwrap();
1839            let req = AddEventLogRequest {
1840                event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1841            };
1842            rt.block_on(async {
1843                let _ = tokio::time::timeout(
1844                    Duration::from_millis(timeout_millis.unwrap_or(1000)),
1845                    grpc_meta_client.add_event_log(req),
1846                )
1847                .await;
1848            });
1849        })
1850        .join();
1851    }
1852
1853    pub async fn add_sink_fail_evet(
1854        &self,
1855        sink_id: SinkId,
1856        sink_name: String,
1857        connector: String,
1858        error: String,
1859    ) -> Result<()> {
1860        let event = event_log::EventSinkFail {
1861            sink_id,
1862            sink_name,
1863            connector,
1864            error,
1865        };
1866        let req = AddEventLogRequest {
1867            event: Some(add_event_log_request::Event::SinkFail(event)),
1868        };
1869        self.inner.add_event_log(req).await?;
1870        Ok(())
1871    }
1872
1873    pub async fn add_cdc_auto_schema_change_fail_event(
1874        &self,
1875        source_id: SourceId,
1876        table_name: String,
1877        cdc_table_id: String,
1878        upstream_ddl: String,
1879        fail_info: String,
1880    ) -> Result<()> {
1881        let event = event_log::EventAutoSchemaChangeFail {
1882            table_id: source_id.as_cdc_table_id(),
1883            table_name,
1884            cdc_table_id,
1885            upstream_ddl,
1886            fail_info,
1887        };
1888        let req = AddEventLogRequest {
1889            event: Some(add_event_log_request::Event::AutoSchemaChangeFail(event)),
1890        };
1891        self.inner.add_event_log(req).await?;
1892        Ok(())
1893    }
1894
1895    pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1896        let req = CancelCompactTaskRequest {
1897            task_id,
1898            task_status: task_status as _,
1899        };
1900        let resp = self.inner.cancel_compact_task(req).await?;
1901        Ok(resp.ret)
1902    }
1903
1904    pub async fn get_version_by_epoch(
1905        &self,
1906        epoch: HummockEpoch,
1907        table_id: TableId,
1908    ) -> Result<PbHummockVersion> {
1909        let req = GetVersionByEpochRequest { epoch, table_id };
1910        let resp = self.inner.get_version_by_epoch(req).await?;
1911        Ok(resp.version.unwrap())
1912    }
1913
1914    pub async fn get_cluster_limits(
1915        &self,
1916    ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1917        let req = GetClusterLimitsRequest {};
1918        let resp = self.inner.get_cluster_limits(req).await?;
1919        Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1920    }
1921
1922    pub async fn merge_compaction_group(
1923        &self,
1924        left_group_id: CompactionGroupId,
1925        right_group_id: CompactionGroupId,
1926    ) -> Result<()> {
1927        let req = MergeCompactionGroupRequest {
1928            left_group_id,
1929            right_group_id,
1930        };
1931        self.inner.merge_compaction_group(req).await?;
1932        Ok(())
1933    }
1934
1935    /// List all rate limits for sources and backfills
1936    pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1937        let request = ListRateLimitsRequest {};
1938        let resp = self.inner.list_rate_limits(request).await?;
1939        Ok(resp.rate_limits)
1940    }
1941
1942    pub async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
1943        let request = ListCdcProgressRequest {};
1944        let resp = self.inner.list_cdc_progress(request).await?;
1945        Ok(resp.cdc_progress)
1946    }
1947
1948    pub async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>> {
1949        let request = ListRefreshTableStatesRequest {};
1950        let resp = self.inner.list_refresh_table_states(request).await?;
1951        Ok(resp.states)
1952    }
1953
1954    pub async fn list_sink_log_store_tables(
1955        &self,
1956    ) -> Result<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
1957        let request = ListSinkLogStoreTablesRequest {};
1958        let resp = self.inner.list_sink_log_store_tables(request).await?;
1959        Ok(resp.tables)
1960    }
1961
1962    pub async fn create_iceberg_table(
1963        &self,
1964        table_job_info: PbTableJobInfo,
1965        sink_job_info: PbSinkJobInfo,
1966        iceberg_source: PbSource,
1967        if_not_exists: bool,
1968    ) -> Result<WaitVersion> {
1969        let request = CreateIcebergTableRequest {
1970            table_info: Some(table_job_info),
1971            sink_info: Some(sink_job_info),
1972            iceberg_source: Some(iceberg_source),
1973            if_not_exists,
1974        };
1975
1976        let resp = self.inner.create_iceberg_table(request).await?;
1977        Ok(resp
1978            .version
1979            .ok_or_else(|| anyhow!("wait version not set"))?)
1980    }
1981
1982    pub async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
1983        let request = ListIcebergTablesRequest {};
1984        let resp = self.inner.list_iceberg_tables(request).await?;
1985        Ok(resp.iceberg_tables)
1986    }
1987
1988    /// Get the await tree of all nodes in the cluster.
1989    pub async fn get_cluster_stack_trace(
1990        &self,
1991        actor_traces_format: ActorTracesFormat,
1992    ) -> Result<StackTraceResponse> {
1993        let request = StackTraceRequest {
1994            actor_traces_format: actor_traces_format as i32,
1995        };
1996        let resp = self.inner.stack_trace(request).await?;
1997        Ok(resp)
1998    }
1999
2000    pub async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
2001        let request = SetSyncLogStoreAlignedRequest { job_id, aligned };
2002        self.inner.set_sync_log_store_aligned(request).await?;
2003        Ok(())
2004    }
2005
2006    pub async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
2007        self.inner.refresh(request).await
2008    }
2009
2010    pub async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
2011        let request = ListUnmigratedTablesRequest {};
2012        let resp = self.inner.list_unmigrated_tables(request).await?;
2013        Ok(resp
2014            .tables
2015            .into_iter()
2016            .map(|table| (table.table_id, table.table_name))
2017            .collect())
2018    }
2019}
2020
2021#[async_trait]
2022impl HummockMetaClient for MetaClient {
2023    async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
2024        let req = UnpinVersionBeforeRequest {
2025            context_id: self.worker_id(),
2026            unpin_version_before,
2027        };
2028        self.inner.unpin_version_before(req).await?;
2029        Ok(())
2030    }
2031
2032    async fn get_current_version(&self) -> Result<HummockVersion> {
2033        let req = GetCurrentVersionRequest::default();
2034        Ok(HummockVersion::from_rpc_protobuf(
2035            &self
2036                .inner
2037                .get_current_version(req)
2038                .await?
2039                .current_version
2040                .unwrap(),
2041        ))
2042    }
2043
2044    async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
2045        let resp = self
2046            .inner
2047            .get_new_object_ids(GetNewObjectIdsRequest { number })
2048            .await?;
2049        Ok(ObjectIdRange::new(resp.start_id, resp.end_id))
2050    }
2051
2052    async fn commit_epoch_with_change_log(
2053        &self,
2054        _epoch: HummockEpoch,
2055        _sync_result: SyncResult,
2056        _change_log_info: Option<HummockMetaClientChangeLogInfo>,
2057    ) -> Result<()> {
2058        panic!("Only meta service can commit_epoch in production.")
2059    }
2060
2061    async fn trigger_manual_compaction(
2062        &self,
2063        compaction_group_id: CompactionGroupId,
2064        table_id: JobId,
2065        level: u32,
2066        sst_ids: Vec<HummockSstableId>,
2067    ) -> Result<()> {
2068        // TODO: support key_range parameter
2069        let req = TriggerManualCompactionRequest {
2070            compaction_group_id,
2071            table_id,
2072            // if table_id not exist, manual_compaction will include all the sst
2073            // without check internal_table_id
2074            level,
2075            sst_ids,
2076            ..Default::default()
2077        };
2078
2079        self.inner.trigger_manual_compaction(req).await?;
2080        Ok(())
2081    }
2082
2083    async fn trigger_full_gc(
2084        &self,
2085        sst_retention_time_sec: u64,
2086        prefix: Option<String>,
2087    ) -> Result<()> {
2088        self.inner
2089            .trigger_full_gc(TriggerFullGcRequest {
2090                sst_retention_time_sec,
2091                prefix,
2092            })
2093            .await?;
2094        Ok(())
2095    }
2096
2097    async fn subscribe_compaction_event(
2098        &self,
2099    ) -> Result<(
2100        UnboundedSender<SubscribeCompactionEventRequest>,
2101        BoxStream<'static, CompactionEventItem>,
2102    )> {
2103        let (request_sender, request_receiver) =
2104            unbounded_channel::<SubscribeCompactionEventRequest>();
2105        request_sender
2106            .send(SubscribeCompactionEventRequest {
2107                event: Some(subscribe_compaction_event_request::Event::Register(
2108                    Register {
2109                        context_id: self.worker_id(),
2110                    },
2111                )),
2112                create_at: SystemTime::now()
2113                    .duration_since(SystemTime::UNIX_EPOCH)
2114                    .expect("Clock may have gone backwards")
2115                    .as_millis() as u64,
2116            })
2117            .context("Failed to subscribe compaction event")?;
2118
2119        let stream = self
2120            .inner
2121            .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
2122                request_receiver,
2123            )))
2124            .await?;
2125
2126        Ok((request_sender, Box::pin(stream)))
2127    }
2128
2129    async fn get_version_by_epoch(
2130        &self,
2131        epoch: HummockEpoch,
2132        table_id: TableId,
2133    ) -> Result<PbHummockVersion> {
2134        self.get_version_by_epoch(epoch, table_id).await
2135    }
2136
2137    async fn subscribe_iceberg_compaction_event(
2138        &self,
2139    ) -> Result<(
2140        UnboundedSender<SubscribeIcebergCompactionEventRequest>,
2141        BoxStream<'static, IcebergCompactionEventItem>,
2142    )> {
2143        let (request_sender, request_receiver) =
2144            unbounded_channel::<SubscribeIcebergCompactionEventRequest>();
2145        request_sender
2146            .send(SubscribeIcebergCompactionEventRequest {
2147                event: Some(subscribe_iceberg_compaction_event_request::Event::Register(
2148                    IcebergRegister {
2149                        context_id: self.worker_id(),
2150                    },
2151                )),
2152                create_at: SystemTime::now()
2153                    .duration_since(SystemTime::UNIX_EPOCH)
2154                    .expect("Clock may have gone backwards")
2155                    .as_millis() as u64,
2156            })
2157            .context("Failed to subscribe compaction event")?;
2158
2159        let stream = self
2160            .inner
2161            .subscribe_iceberg_compaction_event(Request::new(UnboundedReceiverStream::new(
2162                request_receiver,
2163            )))
2164            .await?;
2165
2166        Ok((request_sender, Box::pin(stream)))
2167    }
2168}
2169
2170#[async_trait]
2171impl TelemetryInfoFetcher for MetaClient {
2172    async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
2173        let resp = self
2174            .get_telemetry_info()
2175            .await
2176            .map_err(|e| e.to_report_string())?;
2177        let tracking_id = resp.get_tracking_id().ok();
2178        Ok(tracking_id.map(|id| id.to_owned()))
2179    }
2180}
2181
2182pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
2183
2184#[derive(Debug, Clone)]
2185struct GrpcMetaClientCore {
2186    cluster_client: ClusterServiceClient<Channel>,
2187    meta_member_client: MetaMemberServiceClient<Channel>,
2188    heartbeat_client: HeartbeatServiceClient<Channel>,
2189    ddl_client: DdlServiceClient<Channel>,
2190    hummock_client: HummockManagerServiceClient<Channel>,
2191    notification_client: NotificationServiceClient<Channel>,
2192    stream_client: StreamManagerServiceClient<Channel>,
2193    user_client: UserServiceClient<Channel>,
2194    scale_client: ScaleServiceClient<Channel>,
2195    backup_client: BackupServiceClient<Channel>,
2196    telemetry_client: TelemetryInfoServiceClient<Channel>,
2197    system_params_client: SystemParamsServiceClient<Channel>,
2198    session_params_client: SessionParamServiceClient<Channel>,
2199    serving_client: ServingServiceClient<Channel>,
2200    cloud_client: CloudServiceClient<Channel>,
2201    sink_coordinate_client: SinkCoordinationRpcClient,
2202    event_log_client: EventLogServiceClient<Channel>,
2203    cluster_limit_client: ClusterLimitServiceClient<Channel>,
2204    hosted_iceberg_catalog_service_client: HostedIcebergCatalogServiceClient<Channel>,
2205    monitor_client: MonitorServiceClient<Channel>,
2206}
2207
2208impl GrpcMetaClientCore {
2209    pub(crate) fn new(channel: Channel) -> Self {
2210        let cluster_client = ClusterServiceClient::new(channel.clone());
2211        let meta_member_client = MetaMemberClient::new(channel.clone());
2212        let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
2213        let ddl_client =
2214            DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2215        let hummock_client =
2216            HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2217        let notification_client =
2218            NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2219        let stream_client =
2220            StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2221        let user_client = UserServiceClient::new(channel.clone());
2222        let scale_client =
2223            ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2224        let backup_client = BackupServiceClient::new(channel.clone());
2225        let telemetry_client =
2226            TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2227        let system_params_client = SystemParamsServiceClient::new(channel.clone());
2228        let session_params_client = SessionParamServiceClient::new(channel.clone());
2229        let serving_client = ServingServiceClient::new(channel.clone());
2230        let cloud_client = CloudServiceClient::new(channel.clone());
2231        let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone())
2232            .max_decoding_message_size(usize::MAX);
2233        let event_log_client = EventLogServiceClient::new(channel.clone());
2234        let cluster_limit_client = ClusterLimitServiceClient::new(channel.clone());
2235        let hosted_iceberg_catalog_service_client =
2236            HostedIcebergCatalogServiceClient::new(channel.clone());
2237        let monitor_client = MonitorServiceClient::new(channel);
2238
2239        GrpcMetaClientCore {
2240            cluster_client,
2241            meta_member_client,
2242            heartbeat_client,
2243            ddl_client,
2244            hummock_client,
2245            notification_client,
2246            stream_client,
2247            user_client,
2248            scale_client,
2249            backup_client,
2250            telemetry_client,
2251            system_params_client,
2252            session_params_client,
2253            serving_client,
2254            cloud_client,
2255            sink_coordinate_client,
2256            event_log_client,
2257            cluster_limit_client,
2258            hosted_iceberg_catalog_service_client,
2259            monitor_client,
2260        }
2261    }
2262}
2263
2264/// Client to meta server. Cloning the instance is lightweight.
2265///
2266/// It is a wrapper of tonic client. See [`crate::meta_rpc_client_method_impl`].
2267#[derive(Debug, Clone)]
2268struct GrpcMetaClient {
2269    member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
2270    core: Arc<RwLock<GrpcMetaClientCore>>,
2271}
2272
2273type MetaMemberClient = MetaMemberServiceClient<Channel>;
2274
2275struct MetaMemberGroup {
2276    members: LruCache<http::Uri, Option<MetaMemberClient>>,
2277}
2278
2279struct MetaMemberManagement {
2280    core_ref: Arc<RwLock<GrpcMetaClientCore>>,
2281    members: Either<MetaMemberClient, MetaMemberGroup>,
2282    current_leader: http::Uri,
2283    meta_config: Arc<MetaConfig>,
2284}
2285
2286impl MetaMemberManagement {
2287    const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
2288
2289    fn host_address_to_uri(addr: HostAddress) -> http::Uri {
2290        format!("http://{}:{}", addr.host, addr.port)
2291            .parse()
2292            .unwrap()
2293    }
2294
2295    async fn recreate_core(&self, channel: Channel) {
2296        let mut core = self.core_ref.write().await;
2297        *core = GrpcMetaClientCore::new(channel);
2298    }
2299
2300    async fn refresh_members(&mut self) -> Result<()> {
2301        let leader_addr = match self.members.as_mut() {
2302            Either::Left(client) => {
2303                let resp = client
2304                    .to_owned()
2305                    .members(MembersRequest {})
2306                    .await
2307                    .map_err(RpcError::from_meta_status)?;
2308                let resp = resp.into_inner();
2309                resp.members.into_iter().find(|member| member.is_leader)
2310            }
2311            Either::Right(member_group) => {
2312                let mut fetched_members = None;
2313
2314                for (addr, client) in &mut member_group.members {
2315                    let members: Result<_> = try {
2316                        let mut client = match client {
2317                            Some(cached_client) => cached_client.to_owned(),
2318                            None => {
2319                                let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
2320                                let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
2321                                    .await
2322                                    .context("failed to create client")?;
2323                                let new_client: MetaMemberClient =
2324                                    MetaMemberServiceClient::new(channel);
2325                                *client = Some(new_client.clone());
2326
2327                                new_client
2328                            }
2329                        };
2330
2331                        let resp = client
2332                            .members(MembersRequest {})
2333                            .await
2334                            .context("failed to fetch members")?;
2335
2336                        resp.into_inner().members
2337                    };
2338
2339                    let fetched = members.is_ok();
2340                    fetched_members = Some(members);
2341                    if fetched {
2342                        break;
2343                    }
2344                }
2345
2346                let members = fetched_members
2347                    .context("no member available in the list")?
2348                    .context("could not refresh members")?;
2349
2350                // find new leader
2351                let mut leader = None;
2352                for member in members {
2353                    if member.is_leader {
2354                        leader = Some(member.clone());
2355                    }
2356
2357                    let addr = Self::host_address_to_uri(member.address.unwrap());
2358                    // We don't clean any expired addrs here to deal with some extreme situations.
2359                    if !member_group.members.contains(&addr) {
2360                        tracing::info!("new meta member joined: {}", addr);
2361                        member_group.members.put(addr, None);
2362                    }
2363                }
2364
2365                leader
2366            }
2367        };
2368
2369        if let Some(leader) = leader_addr {
2370            let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
2371
2372            if discovered_leader != self.current_leader {
2373                tracing::info!("new meta leader {} discovered", discovered_leader);
2374
2375                let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
2376                    Duration::from_secs(self.meta_config.meta_leader_lease_secs),
2377                    false,
2378                );
2379
2380                let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
2381                    let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
2382                    GrpcMetaClient::connect_to_endpoint(endpoint).await
2383                })
2384                .await?;
2385
2386                self.recreate_core(channel).await;
2387                self.current_leader = discovered_leader;
2388            }
2389        }
2390
2391        Ok(())
2392    }
2393}
2394
2395impl GrpcMetaClient {
2396    // See `Endpoint::http2_keep_alive_interval`
2397    const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
2398    // See `Endpoint::keep_alive_timeout`
2399    const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
2400    // Retry base interval in ms for connecting to meta server.
2401    const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
2402    // Max retry times for connecting to meta server.
2403    const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
2404
2405    fn start_meta_member_monitor(
2406        &self,
2407        init_leader_addr: http::Uri,
2408        members: Either<MetaMemberClient, MetaMemberGroup>,
2409        force_refresh_receiver: Receiver<Sender<Result<()>>>,
2410        meta_config: Arc<MetaConfig>,
2411    ) -> Result<()> {
2412        let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
2413        let current_leader = init_leader_addr;
2414
2415        let enable_period_tick = matches!(members, Either::Right(_));
2416
2417        let member_management = MetaMemberManagement {
2418            core_ref,
2419            members,
2420            current_leader,
2421            meta_config,
2422        };
2423
2424        let mut force_refresh_receiver = force_refresh_receiver;
2425
2426        tokio::spawn(async move {
2427            let mut member_management = member_management;
2428            let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
2429
2430            loop {
2431                let event: Option<Sender<Result<()>>> = if enable_period_tick {
2432                    tokio::select! {
2433                        _ = ticker.tick() => None,
2434                        result_sender = force_refresh_receiver.recv() => {
2435                            if result_sender.is_none() {
2436                                break;
2437                            }
2438
2439                            result_sender
2440                        },
2441                    }
2442                } else {
2443                    let result_sender = force_refresh_receiver.recv().await;
2444
2445                    if result_sender.is_none() {
2446                        break;
2447                    }
2448
2449                    result_sender
2450                };
2451
2452                let tick_result = member_management.refresh_members().await;
2453                if let Err(e) = tick_result.as_ref() {
2454                    tracing::warn!(error = %e.as_report(),  "refresh meta member client failed");
2455                }
2456
2457                if let Some(sender) = event {
2458                    // ignore resp
2459                    let _resp = sender.send(tick_result);
2460                }
2461            }
2462        });
2463
2464        Ok(())
2465    }
2466
2467    async fn force_refresh_leader(&self) -> Result<()> {
2468        let (sender, receiver) = oneshot::channel();
2469
2470        self.member_monitor_event_sender
2471            .send(sender)
2472            .await
2473            .map_err(|e| anyhow!(e))?;
2474
2475        receiver.await.map_err(|e| anyhow!(e))?
2476    }
2477
2478    /// Connect to the meta server from `addrs`.
2479    pub async fn new(strategy: &MetaAddressStrategy, config: Arc<MetaConfig>) -> Result<Self> {
2480        let (channel, addr) = match strategy {
2481            MetaAddressStrategy::LoadBalance(addr) => {
2482                Self::try_build_rpc_channel(vec![addr.clone()]).await
2483            }
2484            MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
2485        }?;
2486        let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
2487        let client = GrpcMetaClient {
2488            member_monitor_event_sender: force_refresh_sender,
2489            core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
2490        };
2491
2492        let meta_member_client = client.core.read().await.meta_member_client.clone();
2493        let members = match strategy {
2494            MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
2495            MetaAddressStrategy::List(addrs) => {
2496                let mut members = LruCache::new(20.try_into().unwrap());
2497                for addr in addrs {
2498                    members.put(addr.clone(), None);
2499                }
2500                members.put(addr.clone(), Some(meta_member_client));
2501
2502                Either::Right(MetaMemberGroup { members })
2503            }
2504        };
2505
2506        client.start_meta_member_monitor(addr, members, force_refresh_receiver, config.clone())?;
2507
2508        client.force_refresh_leader().await?;
2509
2510        Ok(client)
2511    }
2512
2513    fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2514        Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2515    }
2516
2517    pub(crate) async fn try_build_rpc_channel(
2518        addrs: impl IntoIterator<Item = http::Uri>,
2519    ) -> Result<(Channel, http::Uri)> {
2520        let endpoints: Vec<_> = addrs
2521            .into_iter()
2522            .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2523            .collect();
2524
2525        let mut last_error = None;
2526
2527        for (endpoint, addr) in endpoints {
2528            match Self::connect_to_endpoint(endpoint).await {
2529                Ok(channel) => {
2530                    tracing::info!("Connect to meta server {} successfully", addr);
2531                    return Ok((channel, addr));
2532                }
2533                Err(e) => {
2534                    tracing::warn!(
2535                        error = %e.as_report(),
2536                        "Failed to connect to meta server {}, trying again",
2537                        addr,
2538                    );
2539                    last_error = Some(e);
2540                }
2541            }
2542        }
2543
2544        if let Some(last_error) = last_error {
2545            Err(anyhow::anyhow!(last_error)
2546                .context("failed to connect to all meta servers")
2547                .into())
2548        } else {
2549            bail!("no meta server address provided")
2550        }
2551    }
2552
2553    async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2554        let channel = endpoint
2555            .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2556            .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2557            .connect_timeout(Duration::from_secs(5))
2558            .monitored_connect("grpc-meta-client", Default::default())
2559            .await?
2560            .wrapped();
2561
2562        Ok(channel)
2563    }
2564
2565    pub(crate) fn retry_strategy_to_bound(
2566        high_bound: Duration,
2567        exceed: bool,
2568    ) -> impl Iterator<Item = Duration> {
2569        let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2570            .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2571            .map(jitter);
2572
2573        let mut sum = Duration::default();
2574
2575        iter.take_while(move |duration| {
2576            sum += *duration;
2577
2578            if exceed {
2579                sum < high_bound + *duration
2580            } else {
2581                sum < high_bound
2582            }
2583        })
2584    }
2585}
2586
2587macro_rules! for_all_meta_rpc {
2588    ($macro:ident) => {
2589        $macro! {
2590             { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2591            ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2592            ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2593            ,{ cluster_client, update_worker_node_schedulability, UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse }
2594            ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2595            ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2596            ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2597            ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2598            ,{ stream_client, flush, FlushRequest, FlushResponse }
2599            ,{ stream_client, pause, PauseRequest, PauseResponse }
2600            ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2601            ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2602            ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2603            ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2604            ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2605            ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2606            ,{ stream_client, list_creating_fragment_distribution, ListCreatingFragmentDistributionRequest, ListCreatingFragmentDistributionResponse }
2607            ,{ stream_client, list_sink_log_store_tables, ListSinkLogStoreTablesRequest, ListSinkLogStoreTablesResponse }
2608            ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2609            ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2610            ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2611            ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2612            ,{ stream_client, list_cdc_progress, ListCdcProgressRequest, ListCdcProgressResponse }
2613            ,{ stream_client, list_refresh_table_states, ListRefreshTableStatesRequest, ListRefreshTableStatesResponse }
2614            ,{ stream_client, alter_connector_props, AlterConnectorPropsRequest, AlterConnectorPropsResponse }
2615            ,{ stream_client, alter_source_properties_safe, AlterSourcePropertiesSafeRequest, AlterSourcePropertiesSafeResponse }
2616            ,{ stream_client, reset_source_splits, ResetSourceSplitsRequest, ResetSourceSplitsResponse }
2617            ,{ stream_client, inject_source_offsets, InjectSourceOffsetsRequest, InjectSourceOffsetsResponse }
2618            ,{ stream_client, get_fragment_by_id, GetFragmentByIdRequest, GetFragmentByIdResponse }
2619            ,{ stream_client, get_fragment_vnodes, GetFragmentVnodesRequest, GetFragmentVnodesResponse }
2620            ,{ stream_client, get_actor_vnodes, GetActorVnodesRequest, GetActorVnodesResponse }
2621            ,{ stream_client, set_sync_log_store_aligned, SetSyncLogStoreAlignedRequest, SetSyncLogStoreAlignedResponse }
2622            ,{ stream_client, refresh, RefreshRequest, RefreshResponse }
2623            ,{ stream_client, list_unmigrated_tables, ListUnmigratedTablesRequest, ListUnmigratedTablesResponse }
2624            ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2625            ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2626            ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2627            ,{ ddl_client, alter_subscription_retention, AlterSubscriptionRetentionRequest, AlterSubscriptionRetentionResponse }
2628            ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2629            ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2630            ,{ ddl_client, alter_backfill_parallelism, AlterBackfillParallelismRequest, AlterBackfillParallelismResponse }
2631            ,{ ddl_client, alter_streaming_job_config, AlterStreamingJobConfigRequest, AlterStreamingJobConfigResponse }
2632            ,{ ddl_client, alter_fragment_parallelism, AlterFragmentParallelismRequest, AlterFragmentParallelismResponse }
2633            ,{ ddl_client, alter_cdc_table_backfill_parallelism, AlterCdcTableBackfillParallelismRequest, AlterCdcTableBackfillParallelismResponse }
2634            ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2635            ,{ ddl_client, alter_database_param, AlterDatabaseParamRequest, AlterDatabaseParamResponse }
2636            ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2637            ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2638            ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2639            ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2640            ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2641            ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2642            ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2643            ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2644            ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2645            ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2646            ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2647            ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2648            ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2649            ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2650            ,{ ddl_client, reset_source, ResetSourceRequest, ResetSourceResponse }
2651            ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2652            ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2653            ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2654            ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2655            ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2656            ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2657            ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2658            ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2659            ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2660            ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2661            ,{ ddl_client, risectl_resume_backfill, RisectlResumeBackfillRequest, RisectlResumeBackfillResponse }
2662            ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2663            ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2664            ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2665            ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2666            ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2667            ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2668            ,{ ddl_client, wait, WaitRequest, WaitResponse }
2669            ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2670            ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2671            ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2672            ,{ ddl_client, compact_iceberg_table, CompactIcebergTableRequest, CompactIcebergTableResponse }
2673            ,{ ddl_client, expire_iceberg_table_snapshots, ExpireIcebergTableSnapshotsRequest, ExpireIcebergTableSnapshotsResponse }
2674            ,{ ddl_client, create_iceberg_table, CreateIcebergTableRequest, CreateIcebergTableResponse }
2675            ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2676            ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2677            ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2678            ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2679            ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2680            ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2681            ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2682            ,{ hummock_client, get_new_object_ids, GetNewObjectIdsRequest, GetNewObjectIdsResponse }
2683            ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2684            ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2685            ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2686            ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2687            ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2688            ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2689            ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2690            ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2691            ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2692            ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2693            ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2694            ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2695            ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2696            ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2697            ,{ hummock_client, subscribe_iceberg_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeIcebergCompactionEventRequest>, Streaming<SubscribeIcebergCompactionEventResponse> }
2698            ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2699            ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2700            ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2701            ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2702            ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2703            ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2704            ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2705            ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2706            ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2707            ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2708            ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2709            ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2710            ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2711            ,{ user_client, alter_default_privilege, AlterDefaultPrivilegeRequest, AlterDefaultPrivilegeResponse }
2712            ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2713            ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2714            ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2715            ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2716            ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2717            ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2718            ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2719            ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2720            ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2721            ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2722            ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2723            ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2724            ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2725            ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2726            ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2727            ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2728            ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2729            ,{ hosted_iceberg_catalog_service_client, list_iceberg_tables, ListIcebergTablesRequest, ListIcebergTablesResponse }
2730            ,{ monitor_client, stack_trace, StackTraceRequest, StackTraceResponse }
2731        }
2732    };
2733}
2734
2735impl GrpcMetaClient {
2736    async fn refresh_client_if_needed(&self, code: Code) {
2737        if matches!(
2738            code,
2739            Code::Unknown | Code::Unimplemented | Code::Unavailable
2740        ) {
2741            tracing::debug!("matching tonic code {}", code);
2742            let (result_sender, result_receiver) = oneshot::channel();
2743            if self
2744                .member_monitor_event_sender
2745                .try_send(result_sender)
2746                .is_ok()
2747            {
2748                if let Ok(Err(e)) = result_receiver.await {
2749                    tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2750                }
2751            } else {
2752                tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2753            }
2754        }
2755    }
2756}
2757
2758impl GrpcMetaClient {
2759    for_all_meta_rpc! { meta_rpc_client_method_impl }
2760}