risingwave_rpc_client/
meta_client.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{
33    AlterDatabaseParam, FunctionId, IndexId, ObjectId, SecretId, TableId,
34};
35use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
36use risingwave_common::hash::WorkerSlotMapping;
37use risingwave_common::id::{
38    ConnectionId, DatabaseId, JobId, SchemaId, SinkId, SubscriptionId, UserId, ViewId, WorkerId,
39};
40use risingwave_common::monitor::EndpointExt;
41use risingwave_common::system_param::reader::SystemParamsReader;
42use risingwave_common::telemetry::report::TelemetryInfoFetcher;
43use risingwave_common::util::addr::HostAddr;
44use risingwave_common::util::meta_addr::MetaAddressStrategy;
45use risingwave_common::util::resource_util::cpu::total_cpu_available;
46use risingwave_common::util::resource_util::hostname;
47use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
48use risingwave_error::bail;
49use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
50use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
51use risingwave_hummock_sdk::{
52    CompactionGroupId, HummockEpoch, HummockVersionId, ObjectIdRange, SyncResult,
53};
54use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
55use risingwave_pb::backup_service::*;
56use risingwave_pb::catalog::{
57    Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
58    PbSubscription, PbTable, PbView, Table,
59};
60use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
61use risingwave_pb::cloud_service::*;
62use risingwave_pb::common::worker_node::Property;
63use risingwave_pb::common::{HostAddress, OptionalUint32, OptionalUint64, WorkerNode, WorkerType};
64use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
65use risingwave_pb::ddl_service::alter_owner_request::Object;
66use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
67use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
68use risingwave_pb::ddl_service::*;
69use risingwave_pb::hummock::compact_task::TaskStatus;
70use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
71use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
72use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
73use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
74use risingwave_pb::hummock::write_limits::WriteLimit;
75use risingwave_pb::hummock::*;
76use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Register as IcebergRegister;
77use risingwave_pb::iceberg_compaction::{
78    SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
79    subscribe_iceberg_compaction_event_request,
80};
81use risingwave_pb::id::{ActorId, FragmentId, HummockSstableId, SourceId};
82use risingwave_pb::meta::alter_connector_props_request::{
83    AlterConnectorPropsObject, AlterIcebergTableIds, ExtraOptions,
84};
85use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
86use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
87use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
88use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
89use risingwave_pb::meta::hosted_iceberg_catalog_service_client::HostedIcebergCatalogServiceClient;
90use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
91use risingwave_pb::meta::list_actor_states_response::ActorState;
92use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
93use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
94use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
95use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
96use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
97use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
98use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
99use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
100use risingwave_pb::meta::serving_service_client::ServingServiceClient;
101use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
102use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
103use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
104use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
105use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
106use risingwave_pb::meta::{FragmentDistribution, *};
107use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
108use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
109use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
110use risingwave_pb::secret::PbSecretRef;
111use risingwave_pb::stream_plan::StreamFragmentGraph;
112use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
113use risingwave_pb::user::update_user_request::UpdateField;
114use risingwave_pb::user::user_service_client::UserServiceClient;
115use risingwave_pb::user::*;
116use thiserror_ext::AsReport;
117use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
118use tokio::sync::oneshot::Sender;
119use tokio::sync::{RwLock, mpsc, oneshot};
120use tokio::task::JoinHandle;
121use tokio::time::{self};
122use tokio_retry::strategy::{ExponentialBackoff, jitter};
123use tokio_stream::wrappers::UnboundedReceiverStream;
124use tonic::transport::Endpoint;
125use tonic::{Code, Request, Streaming};
126
127use crate::channel::{Channel, WrappedChannelExt};
128use crate::error::{Result, RpcError};
129use crate::hummock_meta_client::{
130    CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
131    IcebergCompactionEventItem,
132};
133use crate::meta_rpc_client_method_impl;
134
135/// Client to meta server. Cloning the instance is lightweight.
136#[derive(Clone, Debug)]
137pub struct MetaClient {
138    worker_id: WorkerId,
139    worker_type: WorkerType,
140    host_addr: HostAddr,
141    inner: GrpcMetaClient,
142    meta_config: Arc<MetaConfig>,
143    cluster_id: String,
144    shutting_down: Arc<AtomicBool>,
145}
146
147impl MetaClient {
148    pub fn worker_id(&self) -> WorkerId {
149        self.worker_id
150    }
151
152    pub fn host_addr(&self) -> &HostAddr {
153        &self.host_addr
154    }
155
156    pub fn worker_type(&self) -> WorkerType {
157        self.worker_type
158    }
159
160    pub fn cluster_id(&self) -> &str {
161        &self.cluster_id
162    }
163
164    /// Subscribe to notification from meta.
165    pub async fn subscribe(
166        &self,
167        subscribe_type: SubscribeType,
168    ) -> Result<Streaming<SubscribeResponse>> {
169        let request = SubscribeRequest {
170            subscribe_type: subscribe_type as i32,
171            host: Some(self.host_addr.to_protobuf()),
172            worker_id: self.worker_id(),
173        };
174
175        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
176            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
177            true,
178        );
179
180        tokio_retry::Retry::spawn(retry_strategy, || async {
181            let request = request.clone();
182            self.inner.subscribe(request).await
183        })
184        .await
185    }
186
187    pub async fn create_connection(
188        &self,
189        connection_name: String,
190        database_id: DatabaseId,
191        schema_id: SchemaId,
192        owner_id: UserId,
193        req: create_connection_request::Payload,
194    ) -> Result<WaitVersion> {
195        let request = CreateConnectionRequest {
196            name: connection_name,
197            database_id,
198            schema_id,
199            owner_id,
200            payload: Some(req),
201        };
202        let resp = self.inner.create_connection(request).await?;
203        Ok(resp
204            .version
205            .ok_or_else(|| anyhow!("wait version not set"))?)
206    }
207
208    pub async fn create_secret(
209        &self,
210        secret_name: String,
211        database_id: DatabaseId,
212        schema_id: SchemaId,
213        owner_id: UserId,
214        value: Vec<u8>,
215    ) -> Result<WaitVersion> {
216        let request = CreateSecretRequest {
217            name: secret_name,
218            database_id,
219            schema_id,
220            owner_id,
221            value,
222        };
223        let resp = self.inner.create_secret(request).await?;
224        Ok(resp
225            .version
226            .ok_or_else(|| anyhow!("wait version not set"))?)
227    }
228
229    pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
230        let request = ListConnectionsRequest {};
231        let resp = self.inner.list_connections(request).await?;
232        Ok(resp.connections)
233    }
234
235    pub async fn drop_connection(
236        &self,
237        connection_id: ConnectionId,
238        cascade: bool,
239    ) -> Result<WaitVersion> {
240        let request = DropConnectionRequest {
241            connection_id,
242            cascade,
243        };
244        let resp = self.inner.drop_connection(request).await?;
245        Ok(resp
246            .version
247            .ok_or_else(|| anyhow!("wait version not set"))?)
248    }
249
250    pub async fn drop_secret(&self, secret_id: SecretId, cascade: bool) -> Result<WaitVersion> {
251        let request = DropSecretRequest { secret_id, cascade };
252        let resp = self.inner.drop_secret(request).await?;
253        Ok(resp
254            .version
255            .ok_or_else(|| anyhow!("wait version not set"))?)
256    }
257
258    /// Register the current node to the cluster and set the corresponding worker id.
259    ///
260    /// Retry if there's connection issue with the meta node. Exit the process if the registration fails.
261    pub async fn register_new(
262        addr_strategy: MetaAddressStrategy,
263        worker_type: WorkerType,
264        addr: &HostAddr,
265        property: Property,
266        meta_config: Arc<MetaConfig>,
267    ) -> (Self, SystemParamsReader) {
268        let ret =
269            Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
270
271        match ret {
272            Ok(ret) => ret,
273            Err(err) => {
274                tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
275                std::process::exit(1);
276            }
277        }
278    }
279
280    async fn register_new_inner(
281        addr_strategy: MetaAddressStrategy,
282        worker_type: WorkerType,
283        addr: &HostAddr,
284        property: Property,
285        meta_config: Arc<MetaConfig>,
286    ) -> Result<(Self, SystemParamsReader)> {
287        tracing::info!("register meta client using strategy: {}", addr_strategy);
288
289        // Retry until reaching `max_heartbeat_interval_secs`
290        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
291            Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
292            true,
293        );
294
295        if property.is_unschedulable {
296            tracing::warn!("worker {:?} registered as unschedulable", addr.clone());
297        }
298        let init_result: Result<_> = tokio_retry::RetryIf::spawn(
299            retry_strategy,
300            || async {
301                let grpc_meta_client =
302                    GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
303
304                let add_worker_resp = grpc_meta_client
305                    .add_worker_node(AddWorkerNodeRequest {
306                        worker_type: worker_type as i32,
307                        host: Some(addr.to_protobuf()),
308                        property: Some(property.clone()),
309                        resource: Some(risingwave_pb::common::worker_node::Resource {
310                            rw_version: RW_VERSION.to_owned(),
311                            total_memory_bytes: system_memory_available_bytes() as _,
312                            total_cpu_cores: total_cpu_available() as _,
313                            hostname: hostname(),
314                        }),
315                    })
316                    .await
317                    .context("failed to add worker node")?;
318
319                let system_params_resp = grpc_meta_client
320                    .get_system_params(GetSystemParamsRequest {})
321                    .await
322                    .context("failed to get initial system params")?;
323
324                Ok((add_worker_resp, system_params_resp, grpc_meta_client))
325            },
326            // Only retry if there's any transient connection issue.
327            // If the error is from our implementation or business, do not retry it.
328            |e: &RpcError| !e.is_from_tonic_server_impl(),
329        )
330        .await;
331
332        let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
333        let worker_id = add_worker_resp
334            .node_id
335            .expect("AddWorkerNodeResponse::node_id is empty");
336
337        let meta_client = Self {
338            worker_id,
339            worker_type,
340            host_addr: addr.clone(),
341            inner: grpc_meta_client,
342            meta_config: meta_config.clone(),
343            cluster_id: add_worker_resp.cluster_id,
344            shutting_down: Arc::new(false.into()),
345        };
346
347        static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
348        REPORT_PANIC.call_once(|| {
349            let meta_client_clone = meta_client.clone();
350            std::panic::update_hook(move |default_hook, info| {
351                // Try to report panic event to meta node.
352                meta_client_clone.try_add_panic_event_blocking(info, None);
353                default_hook(info);
354            });
355        });
356
357        Ok((meta_client, system_params_resp.params.unwrap().into()))
358    }
359
360    /// Activate the current node in cluster to confirm it's ready to serve.
361    pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
362        let request = ActivateWorkerNodeRequest {
363            host: Some(addr.to_protobuf()),
364            node_id: self.worker_id,
365        };
366        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
367            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
368            true,
369        );
370        tokio_retry::Retry::spawn(retry_strategy, || async {
371            let request = request.clone();
372            self.inner.activate_worker_node(request).await
373        })
374        .await?;
375
376        Ok(())
377    }
378
379    /// Send heartbeat signal to meta service.
380    pub async fn send_heartbeat(&self, node_id: WorkerId) -> Result<()> {
381        let request = HeartbeatRequest { node_id };
382        let resp = self.inner.heartbeat(request).await?;
383        if let Some(status) = resp.status
384            && status.code() == risingwave_pb::common::status::Code::UnknownWorker
385        {
386            // Ignore the error if we're already shutting down.
387            // Otherwise, exit the process.
388            if !self.shutting_down.load(Relaxed) {
389                tracing::error!(message = status.message, "worker expired");
390                std::process::exit(1);
391            }
392        }
393        Ok(())
394    }
395
396    pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
397        let request = CreateDatabaseRequest { db: Some(db) };
398        let resp = self.inner.create_database(request).await?;
399        // TODO: handle error in `resp.status` here
400        Ok(resp
401            .version
402            .ok_or_else(|| anyhow!("wait version not set"))?)
403    }
404
405    pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
406        let request = CreateSchemaRequest {
407            schema: Some(schema),
408        };
409        let resp = self.inner.create_schema(request).await?;
410        // TODO: handle error in `resp.status` here
411        Ok(resp
412            .version
413            .ok_or_else(|| anyhow!("wait version not set"))?)
414    }
415
416    pub async fn create_materialized_view(
417        &self,
418        table: PbTable,
419        graph: StreamFragmentGraph,
420        dependencies: HashSet<ObjectId>,
421        resource_type: streaming_job_resource_type::ResourceType,
422        if_not_exists: bool,
423    ) -> Result<WaitVersion> {
424        let request = CreateMaterializedViewRequest {
425            materialized_view: Some(table),
426            fragment_graph: Some(graph),
427            resource_type: Some(PbStreamingJobResourceType {
428                resource_type: Some(resource_type),
429            }),
430            dependencies: dependencies.into_iter().collect(),
431            if_not_exists,
432        };
433        let resp = self.inner.create_materialized_view(request).await?;
434        // TODO: handle error in `resp.status` here
435        Ok(resp
436            .version
437            .ok_or_else(|| anyhow!("wait version not set"))?)
438    }
439
440    pub async fn drop_materialized_view(
441        &self,
442        table_id: TableId,
443        cascade: bool,
444    ) -> Result<WaitVersion> {
445        let request = DropMaterializedViewRequest { table_id, cascade };
446
447        let resp = self.inner.drop_materialized_view(request).await?;
448        Ok(resp
449            .version
450            .ok_or_else(|| anyhow!("wait version not set"))?)
451    }
452
453    pub async fn create_source(
454        &self,
455        source: PbSource,
456        graph: Option<StreamFragmentGraph>,
457        if_not_exists: bool,
458    ) -> Result<WaitVersion> {
459        let request = CreateSourceRequest {
460            source: Some(source),
461            fragment_graph: graph,
462            if_not_exists,
463        };
464
465        let resp = self.inner.create_source(request).await?;
466        Ok(resp
467            .version
468            .ok_or_else(|| anyhow!("wait version not set"))?)
469    }
470
471    pub async fn create_sink(
472        &self,
473        sink: PbSink,
474        graph: StreamFragmentGraph,
475        dependencies: HashSet<ObjectId>,
476        if_not_exists: bool,
477    ) -> Result<WaitVersion> {
478        let request = CreateSinkRequest {
479            sink: Some(sink),
480            fragment_graph: Some(graph),
481            dependencies: dependencies.into_iter().collect(),
482            if_not_exists,
483        };
484
485        let resp = self.inner.create_sink(request).await?;
486        Ok(resp
487            .version
488            .ok_or_else(|| anyhow!("wait version not set"))?)
489    }
490
491    pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
492        let request = CreateSubscriptionRequest {
493            subscription: Some(subscription),
494        };
495
496        let resp = self.inner.create_subscription(request).await?;
497        Ok(resp
498            .version
499            .ok_or_else(|| anyhow!("wait version not set"))?)
500    }
501
502    pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
503        let request = CreateFunctionRequest {
504            function: Some(function),
505        };
506        let resp = self.inner.create_function(request).await?;
507        Ok(resp
508            .version
509            .ok_or_else(|| anyhow!("wait version not set"))?)
510    }
511
512    pub async fn create_table(
513        &self,
514        source: Option<PbSource>,
515        table: PbTable,
516        graph: StreamFragmentGraph,
517        job_type: PbTableJobType,
518        if_not_exists: bool,
519        dependencies: HashSet<ObjectId>,
520    ) -> Result<WaitVersion> {
521        let request = CreateTableRequest {
522            materialized_view: Some(table),
523            fragment_graph: Some(graph),
524            source,
525            job_type: job_type as _,
526            if_not_exists,
527            dependencies: dependencies.into_iter().collect(),
528        };
529        let resp = self.inner.create_table(request).await?;
530        // TODO: handle error in `resp.status` here
531        Ok(resp
532            .version
533            .ok_or_else(|| anyhow!("wait version not set"))?)
534    }
535
536    pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
537        let request = CommentOnRequest {
538            comment: Some(comment),
539        };
540        let resp = self.inner.comment_on(request).await?;
541        Ok(resp
542            .version
543            .ok_or_else(|| anyhow!("wait version not set"))?)
544    }
545
546    pub async fn alter_name(
547        &self,
548        object: alter_name_request::Object,
549        name: &str,
550    ) -> Result<WaitVersion> {
551        let request = AlterNameRequest {
552            object: Some(object),
553            new_name: name.to_owned(),
554        };
555        let resp = self.inner.alter_name(request).await?;
556        Ok(resp
557            .version
558            .ok_or_else(|| anyhow!("wait version not set"))?)
559    }
560
561    pub async fn alter_owner(&self, object: Object, owner_id: UserId) -> Result<WaitVersion> {
562        let request = AlterOwnerRequest {
563            object: Some(object),
564            owner_id,
565        };
566        let resp = self.inner.alter_owner(request).await?;
567        Ok(resp
568            .version
569            .ok_or_else(|| anyhow!("wait version not set"))?)
570    }
571
572    pub async fn alter_database_param(
573        &self,
574        database_id: DatabaseId,
575        param: AlterDatabaseParam,
576    ) -> Result<WaitVersion> {
577        let request = match param {
578            AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms) => {
579                let barrier_interval_ms = OptionalUint32 {
580                    value: barrier_interval_ms,
581                };
582                AlterDatabaseParamRequest {
583                    database_id,
584                    param: Some(alter_database_param_request::Param::BarrierIntervalMs(
585                        barrier_interval_ms,
586                    )),
587                }
588            }
589            AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency) => {
590                let checkpoint_frequency = OptionalUint64 {
591                    value: checkpoint_frequency,
592                };
593                AlterDatabaseParamRequest {
594                    database_id,
595                    param: Some(alter_database_param_request::Param::CheckpointFrequency(
596                        checkpoint_frequency,
597                    )),
598                }
599            }
600        };
601        let resp = self.inner.alter_database_param(request).await?;
602        Ok(resp
603            .version
604            .ok_or_else(|| anyhow!("wait version not set"))?)
605    }
606
607    pub async fn alter_set_schema(
608        &self,
609        object: alter_set_schema_request::Object,
610        new_schema_id: SchemaId,
611    ) -> Result<WaitVersion> {
612        let request = AlterSetSchemaRequest {
613            new_schema_id,
614            object: Some(object),
615        };
616        let resp = self.inner.alter_set_schema(request).await?;
617        Ok(resp
618            .version
619            .ok_or_else(|| anyhow!("wait version not set"))?)
620    }
621
622    pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
623        let request = AlterSourceRequest {
624            source: Some(source),
625        };
626        let resp = self.inner.alter_source(request).await?;
627        Ok(resp
628            .version
629            .ok_or_else(|| anyhow!("wait version not set"))?)
630    }
631
632    pub async fn alter_parallelism(
633        &self,
634        job_id: JobId,
635        parallelism: PbTableParallelism,
636        deferred: bool,
637    ) -> Result<()> {
638        let request = AlterParallelismRequest {
639            table_id: job_id,
640            parallelism: Some(parallelism),
641            deferred,
642        };
643
644        self.inner.alter_parallelism(request).await?;
645        Ok(())
646    }
647
648    pub async fn alter_streaming_job_config(
649        &self,
650        job_id: JobId,
651        entries_to_add: HashMap<String, String>,
652        keys_to_remove: Vec<String>,
653    ) -> Result<()> {
654        let request = AlterStreamingJobConfigRequest {
655            job_id,
656            entries_to_add,
657            keys_to_remove,
658        };
659
660        self.inner.alter_streaming_job_config(request).await?;
661        Ok(())
662    }
663
664    pub async fn alter_fragment_parallelism(
665        &self,
666        fragment_ids: Vec<FragmentId>,
667        parallelism: Option<PbTableParallelism>,
668    ) -> Result<()> {
669        let request = AlterFragmentParallelismRequest {
670            fragment_ids,
671            parallelism,
672        };
673
674        self.inner.alter_fragment_parallelism(request).await?;
675        Ok(())
676    }
677
678    pub async fn alter_cdc_table_backfill_parallelism(
679        &self,
680        table_id: JobId,
681        parallelism: PbTableParallelism,
682    ) -> Result<()> {
683        let request = AlterCdcTableBackfillParallelismRequest {
684            table_id,
685            parallelism: Some(parallelism),
686        };
687        self.inner
688            .alter_cdc_table_backfill_parallelism(request)
689            .await?;
690        Ok(())
691    }
692
693    pub async fn alter_resource_group(
694        &self,
695        table_id: TableId,
696        resource_group: Option<String>,
697        deferred: bool,
698    ) -> Result<()> {
699        let request = AlterResourceGroupRequest {
700            table_id,
701            resource_group,
702            deferred,
703        };
704
705        self.inner.alter_resource_group(request).await?;
706        Ok(())
707    }
708
709    pub async fn alter_swap_rename(
710        &self,
711        object: alter_swap_rename_request::Object,
712    ) -> Result<WaitVersion> {
713        let request = AlterSwapRenameRequest {
714            object: Some(object),
715        };
716        let resp = self.inner.alter_swap_rename(request).await?;
717        Ok(resp
718            .version
719            .ok_or_else(|| anyhow!("wait version not set"))?)
720    }
721
722    pub async fn alter_secret(
723        &self,
724        secret_id: SecretId,
725        secret_name: String,
726        database_id: DatabaseId,
727        schema_id: SchemaId,
728        owner_id: UserId,
729        value: Vec<u8>,
730    ) -> Result<WaitVersion> {
731        let request = AlterSecretRequest {
732            secret_id,
733            name: secret_name,
734            database_id,
735            schema_id,
736            owner_id,
737            value,
738        };
739        let resp = self.inner.alter_secret(request).await?;
740        Ok(resp
741            .version
742            .ok_or_else(|| anyhow!("wait version not set"))?)
743    }
744
745    pub async fn replace_job(
746        &self,
747        graph: StreamFragmentGraph,
748        replace_job: ReplaceJob,
749    ) -> Result<WaitVersion> {
750        let request = ReplaceJobPlanRequest {
751            plan: Some(ReplaceJobPlan {
752                fragment_graph: Some(graph),
753                replace_job: Some(replace_job),
754            }),
755        };
756        let resp = self.inner.replace_job_plan(request).await?;
757        // TODO: handle error in `resp.status` here
758        Ok(resp
759            .version
760            .ok_or_else(|| anyhow!("wait version not set"))?)
761    }
762
763    pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
764        let request = AutoSchemaChangeRequest {
765            schema_change: Some(schema_change),
766        };
767        let _ = self.inner.auto_schema_change(request).await?;
768        Ok(())
769    }
770
771    pub async fn create_view(
772        &self,
773        view: PbView,
774        dependencies: HashSet<ObjectId>,
775    ) -> Result<WaitVersion> {
776        let request = CreateViewRequest {
777            view: Some(view),
778            dependencies: dependencies.into_iter().collect(),
779        };
780        let resp = self.inner.create_view(request).await?;
781        // TODO: handle error in `resp.status` here
782        Ok(resp
783            .version
784            .ok_or_else(|| anyhow!("wait version not set"))?)
785    }
786
787    pub async fn create_index(
788        &self,
789        index: PbIndex,
790        table: PbTable,
791        graph: StreamFragmentGraph,
792        if_not_exists: bool,
793    ) -> Result<WaitVersion> {
794        let request = CreateIndexRequest {
795            index: Some(index),
796            index_table: Some(table),
797            fragment_graph: Some(graph),
798            if_not_exists,
799        };
800        let resp = self.inner.create_index(request).await?;
801        // TODO: handle error in `resp.status` here
802        Ok(resp
803            .version
804            .ok_or_else(|| anyhow!("wait version not set"))?)
805    }
806
807    pub async fn drop_table(
808        &self,
809        source_id: Option<SourceId>,
810        table_id: TableId,
811        cascade: bool,
812    ) -> Result<WaitVersion> {
813        let request = DropTableRequest {
814            source_id: source_id.map(risingwave_pb::ddl_service::drop_table_request::SourceId::Id),
815            table_id,
816            cascade,
817        };
818
819        let resp = self.inner.drop_table(request).await?;
820        Ok(resp
821            .version
822            .ok_or_else(|| anyhow!("wait version not set"))?)
823    }
824
825    pub async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
826        let request = CompactIcebergTableRequest { sink_id };
827        let resp = self.inner.compact_iceberg_table(request).await?;
828        Ok(resp.task_id)
829    }
830
831    pub async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
832        let request = ExpireIcebergTableSnapshotsRequest { sink_id };
833        let _resp = self.inner.expire_iceberg_table_snapshots(request).await?;
834        Ok(())
835    }
836
837    pub async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<WaitVersion> {
838        let request = DropViewRequest { view_id, cascade };
839        let resp = self.inner.drop_view(request).await?;
840        Ok(resp
841            .version
842            .ok_or_else(|| anyhow!("wait version not set"))?)
843    }
844
845    pub async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<WaitVersion> {
846        let request = DropSourceRequest { source_id, cascade };
847        let resp = self.inner.drop_source(request).await?;
848        Ok(resp
849            .version
850            .ok_or_else(|| anyhow!("wait version not set"))?)
851    }
852
853    pub async fn reset_source(&self, source_id: SourceId) -> Result<WaitVersion> {
854        let request = ResetSourceRequest { source_id };
855        let resp = self.inner.reset_source(request).await?;
856        Ok(resp
857            .version
858            .ok_or_else(|| anyhow!("wait version not set"))?)
859    }
860
861    pub async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<WaitVersion> {
862        let request = DropSinkRequest { sink_id, cascade };
863        let resp = self.inner.drop_sink(request).await?;
864        Ok(resp
865            .version
866            .ok_or_else(|| anyhow!("wait version not set"))?)
867    }
868
869    pub async fn drop_subscription(
870        &self,
871        subscription_id: SubscriptionId,
872        cascade: bool,
873    ) -> Result<WaitVersion> {
874        let request = DropSubscriptionRequest {
875            subscription_id,
876            cascade,
877        };
878        let resp = self.inner.drop_subscription(request).await?;
879        Ok(resp
880            .version
881            .ok_or_else(|| anyhow!("wait version not set"))?)
882    }
883
884    pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
885        let request = DropIndexRequest { index_id, cascade };
886        let resp = self.inner.drop_index(request).await?;
887        Ok(resp
888            .version
889            .ok_or_else(|| anyhow!("wait version not set"))?)
890    }
891
892    pub async fn drop_function(
893        &self,
894        function_id: FunctionId,
895        cascade: bool,
896    ) -> Result<WaitVersion> {
897        let request = DropFunctionRequest {
898            function_id,
899            cascade,
900        };
901        let resp = self.inner.drop_function(request).await?;
902        Ok(resp
903            .version
904            .ok_or_else(|| anyhow!("wait version not set"))?)
905    }
906
907    pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
908        let request = DropDatabaseRequest { database_id };
909        let resp = self.inner.drop_database(request).await?;
910        Ok(resp
911            .version
912            .ok_or_else(|| anyhow!("wait version not set"))?)
913    }
914
915    pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
916        let request = DropSchemaRequest { schema_id, cascade };
917        let resp = self.inner.drop_schema(request).await?;
918        Ok(resp
919            .version
920            .ok_or_else(|| anyhow!("wait version not set"))?)
921    }
922
923    // TODO: using UserInfoVersion instead as return type.
924    pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
925        let request = CreateUserRequest { user: Some(user) };
926        let resp = self.inner.create_user(request).await?;
927        Ok(resp.version)
928    }
929
930    pub async fn drop_user(&self, user_id: UserId) -> Result<u64> {
931        let request = DropUserRequest { user_id };
932        let resp = self.inner.drop_user(request).await?;
933        Ok(resp.version)
934    }
935
936    pub async fn update_user(
937        &self,
938        user: UserInfo,
939        update_fields: Vec<UpdateField>,
940    ) -> Result<u64> {
941        let request = UpdateUserRequest {
942            user: Some(user),
943            update_fields: update_fields
944                .into_iter()
945                .map(|field| field as i32)
946                .collect::<Vec<_>>(),
947        };
948        let resp = self.inner.update_user(request).await?;
949        Ok(resp.version)
950    }
951
952    pub async fn grant_privilege(
953        &self,
954        user_ids: Vec<UserId>,
955        privileges: Vec<GrantPrivilege>,
956        with_grant_option: bool,
957        granted_by: UserId,
958    ) -> Result<u64> {
959        let request = GrantPrivilegeRequest {
960            user_ids,
961            privileges,
962            with_grant_option,
963            granted_by,
964        };
965        let resp = self.inner.grant_privilege(request).await?;
966        Ok(resp.version)
967    }
968
969    pub async fn revoke_privilege(
970        &self,
971        user_ids: Vec<UserId>,
972        privileges: Vec<GrantPrivilege>,
973        granted_by: UserId,
974        revoke_by: UserId,
975        revoke_grant_option: bool,
976        cascade: bool,
977    ) -> Result<u64> {
978        let request = RevokePrivilegeRequest {
979            user_ids,
980            privileges,
981            granted_by,
982            revoke_by,
983            revoke_grant_option,
984            cascade,
985        };
986        let resp = self.inner.revoke_privilege(request).await?;
987        Ok(resp.version)
988    }
989
990    pub async fn alter_default_privilege(
991        &self,
992        user_ids: Vec<UserId>,
993        database_id: DatabaseId,
994        schema_ids: Vec<SchemaId>,
995        operation: AlterDefaultPrivilegeOperation,
996        granted_by: UserId,
997    ) -> Result<()> {
998        let request = AlterDefaultPrivilegeRequest {
999            user_ids,
1000            database_id,
1001            schema_ids,
1002            operation: Some(operation),
1003            granted_by,
1004        };
1005        self.inner.alter_default_privilege(request).await?;
1006        Ok(())
1007    }
1008
1009    /// Unregister the current node from the cluster.
1010    pub async fn unregister(&self) -> Result<()> {
1011        let request = DeleteWorkerNodeRequest {
1012            host: Some(self.host_addr.to_protobuf()),
1013        };
1014        self.inner.delete_worker_node(request).await?;
1015        self.shutting_down.store(true, Relaxed);
1016        Ok(())
1017    }
1018
1019    /// Try to unregister the current worker from the cluster with best effort. Log the result.
1020    pub async fn try_unregister(&self) {
1021        match self.unregister().await {
1022            Ok(_) => {
1023                tracing::info!(
1024                    worker_id = %self.worker_id(),
1025                    "successfully unregistered from meta service",
1026                )
1027            }
1028            Err(e) => {
1029                tracing::warn!(
1030                    error = %e.as_report(),
1031                    worker_id = %self.worker_id(),
1032                    "failed to unregister from meta service",
1033                );
1034            }
1035        }
1036    }
1037
1038    pub async fn update_schedulability(
1039        &self,
1040        worker_ids: &[WorkerId],
1041        schedulability: Schedulability,
1042    ) -> Result<UpdateWorkerNodeSchedulabilityResponse> {
1043        let request = UpdateWorkerNodeSchedulabilityRequest {
1044            worker_ids: worker_ids.to_vec(),
1045            schedulability: schedulability.into(),
1046        };
1047        let resp = self
1048            .inner
1049            .update_worker_node_schedulability(request)
1050            .await?;
1051        Ok(resp)
1052    }
1053
1054    pub async fn list_worker_nodes(
1055        &self,
1056        worker_type: Option<WorkerType>,
1057    ) -> Result<Vec<WorkerNode>> {
1058        let request = ListAllNodesRequest {
1059            worker_type: worker_type.map(Into::into),
1060            include_starting_nodes: true,
1061        };
1062        let resp = self.inner.list_all_nodes(request).await?;
1063        Ok(resp.nodes)
1064    }
1065
1066    /// Starts a heartbeat worker.
1067    pub fn start_heartbeat_loop(
1068        meta_client: MetaClient,
1069        min_interval: Duration,
1070    ) -> (JoinHandle<()>, Sender<()>) {
1071        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1072        let join_handle = tokio::spawn(async move {
1073            let mut min_interval_ticker = tokio::time::interval(min_interval);
1074            loop {
1075                tokio::select! {
1076                    biased;
1077                    // Shutdown
1078                    _ = &mut shutdown_rx => {
1079                        tracing::info!("Heartbeat loop is stopped");
1080                        return;
1081                    }
1082                    // Wait for interval
1083                    _ = min_interval_ticker.tick() => {},
1084                }
1085                tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
1086                match tokio::time::timeout(
1087                    // TODO: decide better min_interval for timeout
1088                    min_interval * 3,
1089                    meta_client.send_heartbeat(meta_client.worker_id()),
1090                )
1091                .await
1092                {
1093                    Ok(Ok(_)) => {}
1094                    Ok(Err(err)) => {
1095                        tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
1096                    }
1097                    Err(_) => {
1098                        tracing::warn!("Failed to send_heartbeat: timeout");
1099                    }
1100                }
1101            }
1102        });
1103        (join_handle, shutdown_tx)
1104    }
1105
1106    pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
1107        let request = RisectlListStateTablesRequest {};
1108        let resp = self.inner.risectl_list_state_tables(request).await?;
1109        Ok(resp.tables)
1110    }
1111
1112    pub async fn risectl_resume_backfill(
1113        &self,
1114        request: RisectlResumeBackfillRequest,
1115    ) -> Result<()> {
1116        self.inner.risectl_resume_backfill(request).await?;
1117        Ok(())
1118    }
1119
1120    pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
1121        let request = FlushRequest { database_id };
1122        let resp = self.inner.flush(request).await?;
1123        Ok(resp.hummock_version_id)
1124    }
1125
1126    pub async fn wait(&self) -> Result<()> {
1127        let request = WaitRequest {};
1128        self.inner.wait(request).await?;
1129        Ok(())
1130    }
1131
1132    pub async fn recover(&self) -> Result<()> {
1133        let request = RecoverRequest {};
1134        self.inner.recover(request).await?;
1135        Ok(())
1136    }
1137
1138    pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
1139        let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
1140        let resp = self.inner.cancel_creating_jobs(request).await?;
1141        Ok(resp.canceled_jobs)
1142    }
1143
1144    pub async fn list_table_fragments(
1145        &self,
1146        job_ids: &[JobId],
1147    ) -> Result<HashMap<JobId, TableFragmentInfo>> {
1148        let request = ListTableFragmentsRequest {
1149            table_ids: job_ids.to_vec(),
1150        };
1151        let resp = self.inner.list_table_fragments(request).await?;
1152        Ok(resp.table_fragments)
1153    }
1154
1155    pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1156        let resp = self
1157            .inner
1158            .list_streaming_job_states(ListStreamingJobStatesRequest {})
1159            .await?;
1160        Ok(resp.states)
1161    }
1162
1163    pub async fn list_fragment_distributions(
1164        &self,
1165        include_node: bool,
1166    ) -> Result<Vec<FragmentDistribution>> {
1167        let resp = self
1168            .inner
1169            .list_fragment_distribution(ListFragmentDistributionRequest {
1170                include_node: Some(include_node),
1171            })
1172            .await?;
1173        Ok(resp.distributions)
1174    }
1175
1176    pub async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
1177        let resp = self
1178            .inner
1179            .list_creating_fragment_distribution(ListCreatingFragmentDistributionRequest {
1180                include_node: Some(true),
1181            })
1182            .await?;
1183        Ok(resp.distributions)
1184    }
1185
1186    pub async fn get_fragment_by_id(
1187        &self,
1188        fragment_id: FragmentId,
1189    ) -> Result<Option<FragmentDistribution>> {
1190        let resp = self
1191            .inner
1192            .get_fragment_by_id(GetFragmentByIdRequest { fragment_id })
1193            .await?;
1194        Ok(resp.distribution)
1195    }
1196
1197    pub async fn get_fragment_vnodes(
1198        &self,
1199        fragment_id: FragmentId,
1200    ) -> Result<Vec<(ActorId, Vec<u32>)>> {
1201        let resp = self
1202            .inner
1203            .get_fragment_vnodes(GetFragmentVnodesRequest { fragment_id })
1204            .await?;
1205        Ok(resp
1206            .actor_vnodes
1207            .into_iter()
1208            .map(|actor| (actor.actor_id, actor.vnode_indices))
1209            .collect())
1210    }
1211
1212    pub async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>> {
1213        let resp = self
1214            .inner
1215            .get_actor_vnodes(GetActorVnodesRequest { actor_id })
1216            .await?;
1217        Ok(resp.vnode_indices)
1218    }
1219
1220    pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1221        let resp = self
1222            .inner
1223            .list_actor_states(ListActorStatesRequest {})
1224            .await?;
1225        Ok(resp.states)
1226    }
1227
1228    pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1229        let resp = self
1230            .inner
1231            .list_actor_splits(ListActorSplitsRequest {})
1232            .await?;
1233
1234        Ok(resp.actor_splits)
1235    }
1236
1237    pub async fn pause(&self) -> Result<PauseResponse> {
1238        let request = PauseRequest {};
1239        let resp = self.inner.pause(request).await?;
1240        Ok(resp)
1241    }
1242
1243    pub async fn resume(&self) -> Result<ResumeResponse> {
1244        let request = ResumeRequest {};
1245        let resp = self.inner.resume(request).await?;
1246        Ok(resp)
1247    }
1248
1249    pub async fn apply_throttle(
1250        &self,
1251        throttle_target: PbThrottleTarget,
1252        throttle_type: risingwave_pb::common::PbThrottleType,
1253        id: u32,
1254        rate: Option<u32>,
1255    ) -> Result<ApplyThrottleResponse> {
1256        let request = ApplyThrottleRequest {
1257            throttle_target: throttle_target as i32,
1258            throttle_type: throttle_type as i32,
1259            id,
1260            rate,
1261        };
1262        let resp = self.inner.apply_throttle(request).await?;
1263        Ok(resp)
1264    }
1265
1266    pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1267        let resp = self
1268            .inner
1269            .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1270            .await?;
1271        Ok(resp.get_status().unwrap())
1272    }
1273
1274    pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1275        let request = GetClusterInfoRequest {};
1276        let resp = self.inner.get_cluster_info(request).await?;
1277        Ok(resp)
1278    }
1279
1280    pub async fn reschedule(
1281        &self,
1282        worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1283        revision: u64,
1284        resolve_no_shuffle_upstream: bool,
1285    ) -> Result<(bool, u64)> {
1286        let request = RescheduleRequest {
1287            revision,
1288            resolve_no_shuffle_upstream,
1289            worker_reschedules,
1290        };
1291        let resp = self.inner.reschedule(request).await?;
1292        Ok((resp.success, resp.revision))
1293    }
1294
1295    pub async fn risectl_get_pinned_versions_summary(
1296        &self,
1297    ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1298        let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1299        self.inner
1300            .rise_ctl_get_pinned_versions_summary(request)
1301            .await
1302    }
1303
1304    pub async fn risectl_get_checkpoint_hummock_version(
1305        &self,
1306    ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1307        let request = RiseCtlGetCheckpointVersionRequest {};
1308        self.inner.rise_ctl_get_checkpoint_version(request).await
1309    }
1310
1311    pub async fn risectl_pause_hummock_version_checkpoint(
1312        &self,
1313    ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1314        let request = RiseCtlPauseVersionCheckpointRequest {};
1315        self.inner.rise_ctl_pause_version_checkpoint(request).await
1316    }
1317
1318    pub async fn risectl_resume_hummock_version_checkpoint(
1319        &self,
1320    ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1321        let request = RiseCtlResumeVersionCheckpointRequest {};
1322        self.inner.rise_ctl_resume_version_checkpoint(request).await
1323    }
1324
1325    pub async fn init_metadata_for_replay(
1326        &self,
1327        tables: Vec<PbTable>,
1328        compaction_groups: Vec<CompactionGroupInfo>,
1329    ) -> Result<()> {
1330        let req = InitMetadataForReplayRequest {
1331            tables,
1332            compaction_groups,
1333        };
1334        let _resp = self.inner.init_metadata_for_replay(req).await?;
1335        Ok(())
1336    }
1337
1338    pub async fn replay_version_delta(
1339        &self,
1340        version_delta: HummockVersionDelta,
1341    ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1342        let req = ReplayVersionDeltaRequest {
1343            version_delta: Some(version_delta.into()),
1344        };
1345        let resp = self.inner.replay_version_delta(req).await?;
1346        Ok((
1347            HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1348            resp.modified_compaction_groups,
1349        ))
1350    }
1351
1352    pub async fn list_version_deltas(
1353        &self,
1354        start_id: HummockVersionId,
1355        num_limit: u32,
1356        committed_epoch_limit: HummockEpoch,
1357    ) -> Result<Vec<HummockVersionDelta>> {
1358        let req = ListVersionDeltasRequest {
1359            start_id,
1360            num_limit,
1361            committed_epoch_limit,
1362        };
1363        Ok(self
1364            .inner
1365            .list_version_deltas(req)
1366            .await?
1367            .version_deltas
1368            .unwrap()
1369            .version_deltas
1370            .iter()
1371            .map(HummockVersionDelta::from_rpc_protobuf)
1372            .collect())
1373    }
1374
1375    pub async fn trigger_compaction_deterministic(
1376        &self,
1377        version_id: HummockVersionId,
1378        compaction_groups: Vec<CompactionGroupId>,
1379    ) -> Result<()> {
1380        let req = TriggerCompactionDeterministicRequest {
1381            version_id,
1382            compaction_groups,
1383        };
1384        self.inner.trigger_compaction_deterministic(req).await?;
1385        Ok(())
1386    }
1387
1388    pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1389        let req = DisableCommitEpochRequest {};
1390        Ok(HummockVersion::from_rpc_protobuf(
1391            &self
1392                .inner
1393                .disable_commit_epoch(req)
1394                .await?
1395                .current_version
1396                .unwrap(),
1397        ))
1398    }
1399
1400    pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1401        let req = GetAssignedCompactTaskNumRequest {};
1402        let resp = self.inner.get_assigned_compact_task_num(req).await?;
1403        Ok(resp.num_tasks as usize)
1404    }
1405
1406    pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1407        let req = RiseCtlListCompactionGroupRequest {};
1408        let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1409        Ok(resp.compaction_groups)
1410    }
1411
1412    pub async fn risectl_update_compaction_config(
1413        &self,
1414        compaction_groups: &[CompactionGroupId],
1415        configs: &[MutableConfig],
1416    ) -> Result<()> {
1417        let req = RiseCtlUpdateCompactionConfigRequest {
1418            compaction_group_ids: compaction_groups.to_vec(),
1419            configs: configs
1420                .iter()
1421                .map(
1422                    |c| rise_ctl_update_compaction_config_request::MutableConfig {
1423                        mutable_config: Some(c.clone()),
1424                    },
1425                )
1426                .collect(),
1427        };
1428        let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1429        Ok(())
1430    }
1431
1432    pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1433        let req = BackupMetaRequest { remarks };
1434        let resp = self.inner.backup_meta(req).await?;
1435        Ok(resp.job_id)
1436    }
1437
1438    pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1439        let req = GetBackupJobStatusRequest { job_id };
1440        let resp = self.inner.get_backup_job_status(req).await?;
1441        Ok((resp.job_status(), resp.message))
1442    }
1443
1444    pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1445        let req = DeleteMetaSnapshotRequest {
1446            snapshot_ids: snapshot_ids.to_vec(),
1447        };
1448        let _resp = self.inner.delete_meta_snapshot(req).await?;
1449        Ok(())
1450    }
1451
1452    pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1453        let req = GetMetaSnapshotManifestRequest {};
1454        let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1455        Ok(resp.manifest.expect("should exist"))
1456    }
1457
1458    pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1459        let req = GetTelemetryInfoRequest {};
1460        let resp = self.inner.get_telemetry_info(req).await?;
1461        Ok(resp)
1462    }
1463
1464    pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1465        let req = GetMetaStoreInfoRequest {};
1466        let resp = self.inner.get_meta_store_info(req).await?;
1467        Ok(resp.meta_store_endpoint)
1468    }
1469
1470    pub async fn alter_sink_props(
1471        &self,
1472        sink_id: SinkId,
1473        changed_props: BTreeMap<String, String>,
1474        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1475        connector_conn_ref: Option<ConnectionId>,
1476    ) -> Result<()> {
1477        let req = AlterConnectorPropsRequest {
1478            object_id: sink_id.as_raw_id(),
1479            changed_props: changed_props.into_iter().collect(),
1480            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1481            connector_conn_ref,
1482            object_type: AlterConnectorPropsObject::Sink as i32,
1483            extra_options: None,
1484        };
1485        let _resp = self.inner.alter_connector_props(req).await?;
1486        Ok(())
1487    }
1488
1489    pub async fn alter_iceberg_table_props(
1490        &self,
1491        table_id: TableId,
1492        sink_id: SinkId,
1493        source_id: SourceId,
1494        changed_props: BTreeMap<String, String>,
1495        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1496        connector_conn_ref: Option<ConnectionId>,
1497    ) -> Result<()> {
1498        let req = AlterConnectorPropsRequest {
1499            object_id: table_id.as_raw_id(),
1500            changed_props: changed_props.into_iter().collect(),
1501            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1502            connector_conn_ref,
1503            object_type: AlterConnectorPropsObject::IcebergTable as i32,
1504            extra_options: Some(ExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds {
1505                sink_id,
1506                source_id,
1507            })),
1508        };
1509        let _resp = self.inner.alter_connector_props(req).await?;
1510        Ok(())
1511    }
1512
1513    pub async fn alter_source_connector_props(
1514        &self,
1515        source_id: SourceId,
1516        changed_props: BTreeMap<String, String>,
1517        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1518        connector_conn_ref: Option<ConnectionId>,
1519    ) -> Result<()> {
1520        let req = AlterConnectorPropsRequest {
1521            object_id: source_id.as_raw_id(),
1522            changed_props: changed_props.into_iter().collect(),
1523            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1524            connector_conn_ref,
1525            object_type: AlterConnectorPropsObject::Source as i32,
1526            extra_options: None,
1527        };
1528        let _resp = self.inner.alter_connector_props(req).await?;
1529        Ok(())
1530    }
1531
1532    pub async fn alter_connection_connector_props(
1533        &self,
1534        connection_id: u32,
1535        changed_props: BTreeMap<String, String>,
1536        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1537    ) -> Result<()> {
1538        let req = AlterConnectorPropsRequest {
1539            object_id: connection_id,
1540            changed_props: changed_props.into_iter().collect(),
1541            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1542            connector_conn_ref: None, // Connections don't reference other connections
1543            object_type: AlterConnectorPropsObject::Connection as i32,
1544            extra_options: None,
1545        };
1546        let _resp = self.inner.alter_connector_props(req).await?;
1547        Ok(())
1548    }
1549
1550    pub async fn set_system_param(
1551        &self,
1552        param: String,
1553        value: Option<String>,
1554    ) -> Result<Option<SystemParamsReader>> {
1555        let req = SetSystemParamRequest { param, value };
1556        let resp = self.inner.set_system_param(req).await?;
1557        Ok(resp.params.map(SystemParamsReader::from))
1558    }
1559
1560    pub async fn get_session_params(&self) -> Result<String> {
1561        let req = GetSessionParamsRequest {};
1562        let resp = self.inner.get_session_params(req).await?;
1563        Ok(resp.params)
1564    }
1565
1566    pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1567        let req = SetSessionParamRequest { param, value };
1568        let resp = self.inner.set_session_param(req).await?;
1569        Ok(resp.param)
1570    }
1571
1572    pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1573        let req = GetDdlProgressRequest {};
1574        let resp = self.inner.get_ddl_progress(req).await?;
1575        Ok(resp.ddl_progress)
1576    }
1577
1578    pub async fn split_compaction_group(
1579        &self,
1580        group_id: CompactionGroupId,
1581        table_ids_to_new_group: &[TableId],
1582        partition_vnode_count: u32,
1583    ) -> Result<CompactionGroupId> {
1584        let req = SplitCompactionGroupRequest {
1585            group_id,
1586            table_ids: table_ids_to_new_group.to_vec(),
1587            partition_vnode_count,
1588        };
1589        let resp = self.inner.split_compaction_group(req).await?;
1590        Ok(resp.new_group_id)
1591    }
1592
1593    pub async fn get_tables(
1594        &self,
1595        table_ids: Vec<TableId>,
1596        include_dropped_tables: bool,
1597    ) -> Result<HashMap<TableId, Table>> {
1598        let req = GetTablesRequest {
1599            table_ids,
1600            include_dropped_tables,
1601        };
1602        let resp = self.inner.get_tables(req).await?;
1603        Ok(resp.tables)
1604    }
1605
1606    pub async fn list_serving_vnode_mappings(
1607        &self,
1608    ) -> Result<HashMap<FragmentId, (JobId, WorkerSlotMapping)>> {
1609        let req = GetServingVnodeMappingsRequest {};
1610        let resp = self.inner.get_serving_vnode_mappings(req).await?;
1611        let mappings = resp
1612            .worker_slot_mappings
1613            .into_iter()
1614            .map(|p| {
1615                (
1616                    p.fragment_id,
1617                    (
1618                        resp.fragment_to_table
1619                            .get(&p.fragment_id)
1620                            .cloned()
1621                            .unwrap_or_default(),
1622                        WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1623                    ),
1624                )
1625            })
1626            .collect();
1627        Ok(mappings)
1628    }
1629
1630    pub async fn risectl_list_compaction_status(
1631        &self,
1632    ) -> Result<(
1633        Vec<CompactStatus>,
1634        Vec<CompactTaskAssignment>,
1635        Vec<CompactTaskProgress>,
1636    )> {
1637        let req = RiseCtlListCompactionStatusRequest {};
1638        let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1639        Ok((
1640            resp.compaction_statuses,
1641            resp.task_assignment,
1642            resp.task_progress,
1643        ))
1644    }
1645
1646    pub async fn get_compaction_score(
1647        &self,
1648        compaction_group_id: CompactionGroupId,
1649    ) -> Result<Vec<PickerInfo>> {
1650        let req = GetCompactionScoreRequest {
1651            compaction_group_id,
1652        };
1653        let resp = self.inner.get_compaction_score(req).await?;
1654        Ok(resp.scores)
1655    }
1656
1657    pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1658        let req = RiseCtlRebuildTableStatsRequest {};
1659        let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1660        Ok(())
1661    }
1662
1663    pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1664        let req = ListBranchedObjectRequest {};
1665        let resp = self.inner.list_branched_object(req).await?;
1666        Ok(resp.branched_objects)
1667    }
1668
1669    pub async fn list_active_write_limit(&self) -> Result<HashMap<CompactionGroupId, WriteLimit>> {
1670        let req = ListActiveWriteLimitRequest {};
1671        let resp = self.inner.list_active_write_limit(req).await?;
1672        Ok(resp.write_limits)
1673    }
1674
1675    pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1676        let req = ListHummockMetaConfigRequest {};
1677        let resp = self.inner.list_hummock_meta_config(req).await?;
1678        Ok(resp.configs)
1679    }
1680
1681    pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1682        let _resp = self
1683            .inner
1684            .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1685            .await?;
1686
1687        Ok(())
1688    }
1689
1690    pub async fn rw_cloud_validate_source(
1691        &self,
1692        source_type: SourceType,
1693        source_config: HashMap<String, String>,
1694    ) -> Result<RwCloudValidateSourceResponse> {
1695        let req = RwCloudValidateSourceRequest {
1696            source_type: source_type.into(),
1697            source_config,
1698        };
1699        let resp = self.inner.rw_cloud_validate_source(req).await?;
1700        Ok(resp)
1701    }
1702
1703    pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1704        self.inner.core.read().await.sink_coordinate_client.clone()
1705    }
1706
1707    pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1708        let req = ListCompactTaskAssignmentRequest {};
1709        let resp = self.inner.list_compact_task_assignment(req).await?;
1710        Ok(resp.task_assignment)
1711    }
1712
1713    pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1714        let req = ListEventLogRequest::default();
1715        let resp = self.inner.list_event_log(req).await?;
1716        Ok(resp.event_logs)
1717    }
1718
1719    pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1720        let req = ListCompactTaskProgressRequest {};
1721        let resp = self.inner.list_compact_task_progress(req).await?;
1722        Ok(resp.task_progress)
1723    }
1724
1725    #[cfg(madsim)]
1726    pub fn try_add_panic_event_blocking(
1727        &self,
1728        panic_info: impl Display,
1729        timeout_millis: Option<u64>,
1730    ) {
1731    }
1732
1733    /// If `timeout_millis` is None, default is used.
1734    #[cfg(not(madsim))]
1735    pub fn try_add_panic_event_blocking(
1736        &self,
1737        panic_info: impl Display,
1738        timeout_millis: Option<u64>,
1739    ) {
1740        let event = event_log::EventWorkerNodePanic {
1741            worker_id: self.worker_id,
1742            worker_type: self.worker_type.into(),
1743            host_addr: Some(self.host_addr.to_protobuf()),
1744            panic_info: format!("{panic_info}"),
1745        };
1746        let grpc_meta_client = self.inner.clone();
1747        let _ = thread::spawn(move || {
1748            let rt = tokio::runtime::Builder::new_current_thread()
1749                .enable_all()
1750                .build()
1751                .unwrap();
1752            let req = AddEventLogRequest {
1753                event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1754            };
1755            rt.block_on(async {
1756                let _ = tokio::time::timeout(
1757                    Duration::from_millis(timeout_millis.unwrap_or(1000)),
1758                    grpc_meta_client.add_event_log(req),
1759                )
1760                .await;
1761            });
1762        })
1763        .join();
1764    }
1765
1766    pub async fn add_sink_fail_evet(
1767        &self,
1768        sink_id: SinkId,
1769        sink_name: String,
1770        connector: String,
1771        error: String,
1772    ) -> Result<()> {
1773        let event = event_log::EventSinkFail {
1774            sink_id,
1775            sink_name,
1776            connector,
1777            error,
1778        };
1779        let req = AddEventLogRequest {
1780            event: Some(add_event_log_request::Event::SinkFail(event)),
1781        };
1782        self.inner.add_event_log(req).await?;
1783        Ok(())
1784    }
1785
1786    pub async fn add_cdc_auto_schema_change_fail_event(
1787        &self,
1788        source_id: SourceId,
1789        table_name: String,
1790        cdc_table_id: String,
1791        upstream_ddl: String,
1792        fail_info: String,
1793    ) -> Result<()> {
1794        let event = event_log::EventAutoSchemaChangeFail {
1795            table_id: source_id.as_cdc_table_id(),
1796            table_name,
1797            cdc_table_id,
1798            upstream_ddl,
1799            fail_info,
1800        };
1801        let req = AddEventLogRequest {
1802            event: Some(add_event_log_request::Event::AutoSchemaChangeFail(event)),
1803        };
1804        self.inner.add_event_log(req).await?;
1805        Ok(())
1806    }
1807
1808    pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1809        let req = CancelCompactTaskRequest {
1810            task_id,
1811            task_status: task_status as _,
1812        };
1813        let resp = self.inner.cancel_compact_task(req).await?;
1814        Ok(resp.ret)
1815    }
1816
1817    pub async fn get_version_by_epoch(
1818        &self,
1819        epoch: HummockEpoch,
1820        table_id: TableId,
1821    ) -> Result<PbHummockVersion> {
1822        let req = GetVersionByEpochRequest { epoch, table_id };
1823        let resp = self.inner.get_version_by_epoch(req).await?;
1824        Ok(resp.version.unwrap())
1825    }
1826
1827    pub async fn get_cluster_limits(
1828        &self,
1829    ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1830        let req = GetClusterLimitsRequest {};
1831        let resp = self.inner.get_cluster_limits(req).await?;
1832        Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1833    }
1834
1835    pub async fn merge_compaction_group(
1836        &self,
1837        left_group_id: CompactionGroupId,
1838        right_group_id: CompactionGroupId,
1839    ) -> Result<()> {
1840        let req = MergeCompactionGroupRequest {
1841            left_group_id,
1842            right_group_id,
1843        };
1844        self.inner.merge_compaction_group(req).await?;
1845        Ok(())
1846    }
1847
1848    /// List all rate limits for sources and backfills
1849    pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1850        let request = ListRateLimitsRequest {};
1851        let resp = self.inner.list_rate_limits(request).await?;
1852        Ok(resp.rate_limits)
1853    }
1854
1855    pub async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
1856        let request = ListCdcProgressRequest {};
1857        let resp = self.inner.list_cdc_progress(request).await?;
1858        Ok(resp.cdc_progress)
1859    }
1860
1861    pub async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>> {
1862        let request = ListRefreshTableStatesRequest {};
1863        let resp = self.inner.list_refresh_table_states(request).await?;
1864        Ok(resp.states)
1865    }
1866
1867    pub async fn list_sink_log_store_tables(
1868        &self,
1869    ) -> Result<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
1870        let request = ListSinkLogStoreTablesRequest {};
1871        let resp = self.inner.list_sink_log_store_tables(request).await?;
1872        Ok(resp.tables)
1873    }
1874
1875    pub async fn create_iceberg_table(
1876        &self,
1877        table_job_info: PbTableJobInfo,
1878        sink_job_info: PbSinkJobInfo,
1879        iceberg_source: PbSource,
1880        if_not_exists: bool,
1881    ) -> Result<WaitVersion> {
1882        let request = CreateIcebergTableRequest {
1883            table_info: Some(table_job_info),
1884            sink_info: Some(sink_job_info),
1885            iceberg_source: Some(iceberg_source),
1886            if_not_exists,
1887        };
1888
1889        let resp = self.inner.create_iceberg_table(request).await?;
1890        Ok(resp
1891            .version
1892            .ok_or_else(|| anyhow!("wait version not set"))?)
1893    }
1894
1895    pub async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
1896        let request = ListIcebergTablesRequest {};
1897        let resp = self.inner.list_iceberg_tables(request).await?;
1898        Ok(resp.iceberg_tables)
1899    }
1900
1901    /// Get the await tree of all nodes in the cluster.
1902    pub async fn get_cluster_stack_trace(
1903        &self,
1904        actor_traces_format: ActorTracesFormat,
1905    ) -> Result<StackTraceResponse> {
1906        let request = StackTraceRequest {
1907            actor_traces_format: actor_traces_format as i32,
1908        };
1909        let resp = self.inner.stack_trace(request).await?;
1910        Ok(resp)
1911    }
1912
1913    pub async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
1914        let request = SetSyncLogStoreAlignedRequest { job_id, aligned };
1915        self.inner.set_sync_log_store_aligned(request).await?;
1916        Ok(())
1917    }
1918
1919    pub async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
1920        self.inner.refresh(request).await
1921    }
1922
1923    pub async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
1924        let request = ListUnmigratedTablesRequest {};
1925        let resp = self.inner.list_unmigrated_tables(request).await?;
1926        Ok(resp
1927            .tables
1928            .into_iter()
1929            .map(|table| (table.table_id, table.table_name))
1930            .collect())
1931    }
1932}
1933
1934#[async_trait]
1935impl HummockMetaClient for MetaClient {
1936    async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
1937        let req = UnpinVersionBeforeRequest {
1938            context_id: self.worker_id(),
1939            unpin_version_before,
1940        };
1941        self.inner.unpin_version_before(req).await?;
1942        Ok(())
1943    }
1944
1945    async fn get_current_version(&self) -> Result<HummockVersion> {
1946        let req = GetCurrentVersionRequest::default();
1947        Ok(HummockVersion::from_rpc_protobuf(
1948            &self
1949                .inner
1950                .get_current_version(req)
1951                .await?
1952                .current_version
1953                .unwrap(),
1954        ))
1955    }
1956
1957    async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
1958        let resp = self
1959            .inner
1960            .get_new_object_ids(GetNewObjectIdsRequest { number })
1961            .await?;
1962        Ok(ObjectIdRange::new(resp.start_id, resp.end_id))
1963    }
1964
1965    async fn commit_epoch_with_change_log(
1966        &self,
1967        _epoch: HummockEpoch,
1968        _sync_result: SyncResult,
1969        _change_log_info: Option<HummockMetaClientChangeLogInfo>,
1970    ) -> Result<()> {
1971        panic!("Only meta service can commit_epoch in production.")
1972    }
1973
1974    async fn trigger_manual_compaction(
1975        &self,
1976        compaction_group_id: CompactionGroupId,
1977        table_id: JobId,
1978        level: u32,
1979        sst_ids: Vec<HummockSstableId>,
1980    ) -> Result<()> {
1981        // TODO: support key_range parameter
1982        let req = TriggerManualCompactionRequest {
1983            compaction_group_id,
1984            table_id,
1985            // if table_id not exist, manual_compaction will include all the sst
1986            // without check internal_table_id
1987            level,
1988            sst_ids,
1989            ..Default::default()
1990        };
1991
1992        self.inner.trigger_manual_compaction(req).await?;
1993        Ok(())
1994    }
1995
1996    async fn trigger_full_gc(
1997        &self,
1998        sst_retention_time_sec: u64,
1999        prefix: Option<String>,
2000    ) -> Result<()> {
2001        self.inner
2002            .trigger_full_gc(TriggerFullGcRequest {
2003                sst_retention_time_sec,
2004                prefix,
2005            })
2006            .await?;
2007        Ok(())
2008    }
2009
2010    async fn subscribe_compaction_event(
2011        &self,
2012    ) -> Result<(
2013        UnboundedSender<SubscribeCompactionEventRequest>,
2014        BoxStream<'static, CompactionEventItem>,
2015    )> {
2016        let (request_sender, request_receiver) =
2017            unbounded_channel::<SubscribeCompactionEventRequest>();
2018        request_sender
2019            .send(SubscribeCompactionEventRequest {
2020                event: Some(subscribe_compaction_event_request::Event::Register(
2021                    Register {
2022                        context_id: self.worker_id(),
2023                    },
2024                )),
2025                create_at: SystemTime::now()
2026                    .duration_since(SystemTime::UNIX_EPOCH)
2027                    .expect("Clock may have gone backwards")
2028                    .as_millis() as u64,
2029            })
2030            .context("Failed to subscribe compaction event")?;
2031
2032        let stream = self
2033            .inner
2034            .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
2035                request_receiver,
2036            )))
2037            .await?;
2038
2039        Ok((request_sender, Box::pin(stream)))
2040    }
2041
2042    async fn get_version_by_epoch(
2043        &self,
2044        epoch: HummockEpoch,
2045        table_id: TableId,
2046    ) -> Result<PbHummockVersion> {
2047        self.get_version_by_epoch(epoch, table_id).await
2048    }
2049
2050    async fn subscribe_iceberg_compaction_event(
2051        &self,
2052    ) -> Result<(
2053        UnboundedSender<SubscribeIcebergCompactionEventRequest>,
2054        BoxStream<'static, IcebergCompactionEventItem>,
2055    )> {
2056        let (request_sender, request_receiver) =
2057            unbounded_channel::<SubscribeIcebergCompactionEventRequest>();
2058        request_sender
2059            .send(SubscribeIcebergCompactionEventRequest {
2060                event: Some(subscribe_iceberg_compaction_event_request::Event::Register(
2061                    IcebergRegister {
2062                        context_id: self.worker_id(),
2063                    },
2064                )),
2065                create_at: SystemTime::now()
2066                    .duration_since(SystemTime::UNIX_EPOCH)
2067                    .expect("Clock may have gone backwards")
2068                    .as_millis() as u64,
2069            })
2070            .context("Failed to subscribe compaction event")?;
2071
2072        let stream = self
2073            .inner
2074            .subscribe_iceberg_compaction_event(Request::new(UnboundedReceiverStream::new(
2075                request_receiver,
2076            )))
2077            .await?;
2078
2079        Ok((request_sender, Box::pin(stream)))
2080    }
2081}
2082
2083#[async_trait]
2084impl TelemetryInfoFetcher for MetaClient {
2085    async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
2086        let resp = self
2087            .get_telemetry_info()
2088            .await
2089            .map_err(|e| e.to_report_string())?;
2090        let tracking_id = resp.get_tracking_id().ok();
2091        Ok(tracking_id.map(|id| id.to_owned()))
2092    }
2093}
2094
2095pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
2096
2097#[derive(Debug, Clone)]
2098struct GrpcMetaClientCore {
2099    cluster_client: ClusterServiceClient<Channel>,
2100    meta_member_client: MetaMemberServiceClient<Channel>,
2101    heartbeat_client: HeartbeatServiceClient<Channel>,
2102    ddl_client: DdlServiceClient<Channel>,
2103    hummock_client: HummockManagerServiceClient<Channel>,
2104    notification_client: NotificationServiceClient<Channel>,
2105    stream_client: StreamManagerServiceClient<Channel>,
2106    user_client: UserServiceClient<Channel>,
2107    scale_client: ScaleServiceClient<Channel>,
2108    backup_client: BackupServiceClient<Channel>,
2109    telemetry_client: TelemetryInfoServiceClient<Channel>,
2110    system_params_client: SystemParamsServiceClient<Channel>,
2111    session_params_client: SessionParamServiceClient<Channel>,
2112    serving_client: ServingServiceClient<Channel>,
2113    cloud_client: CloudServiceClient<Channel>,
2114    sink_coordinate_client: SinkCoordinationRpcClient,
2115    event_log_client: EventLogServiceClient<Channel>,
2116    cluster_limit_client: ClusterLimitServiceClient<Channel>,
2117    hosted_iceberg_catalog_service_client: HostedIcebergCatalogServiceClient<Channel>,
2118    monitor_client: MonitorServiceClient<Channel>,
2119}
2120
2121impl GrpcMetaClientCore {
2122    pub(crate) fn new(channel: Channel) -> Self {
2123        let cluster_client = ClusterServiceClient::new(channel.clone());
2124        let meta_member_client = MetaMemberClient::new(channel.clone());
2125        let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
2126        let ddl_client =
2127            DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2128        let hummock_client =
2129            HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2130        let notification_client =
2131            NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2132        let stream_client =
2133            StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2134        let user_client = UserServiceClient::new(channel.clone());
2135        let scale_client =
2136            ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2137        let backup_client = BackupServiceClient::new(channel.clone());
2138        let telemetry_client =
2139            TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2140        let system_params_client = SystemParamsServiceClient::new(channel.clone());
2141        let session_params_client = SessionParamServiceClient::new(channel.clone());
2142        let serving_client = ServingServiceClient::new(channel.clone());
2143        let cloud_client = CloudServiceClient::new(channel.clone());
2144        let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone())
2145            .max_decoding_message_size(usize::MAX);
2146        let event_log_client = EventLogServiceClient::new(channel.clone());
2147        let cluster_limit_client = ClusterLimitServiceClient::new(channel.clone());
2148        let hosted_iceberg_catalog_service_client =
2149            HostedIcebergCatalogServiceClient::new(channel.clone());
2150        let monitor_client = MonitorServiceClient::new(channel);
2151
2152        GrpcMetaClientCore {
2153            cluster_client,
2154            meta_member_client,
2155            heartbeat_client,
2156            ddl_client,
2157            hummock_client,
2158            notification_client,
2159            stream_client,
2160            user_client,
2161            scale_client,
2162            backup_client,
2163            telemetry_client,
2164            system_params_client,
2165            session_params_client,
2166            serving_client,
2167            cloud_client,
2168            sink_coordinate_client,
2169            event_log_client,
2170            cluster_limit_client,
2171            hosted_iceberg_catalog_service_client,
2172            monitor_client,
2173        }
2174    }
2175}
2176
2177/// Client to meta server. Cloning the instance is lightweight.
2178///
2179/// It is a wrapper of tonic client. See [`crate::meta_rpc_client_method_impl`].
2180#[derive(Debug, Clone)]
2181struct GrpcMetaClient {
2182    member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
2183    core: Arc<RwLock<GrpcMetaClientCore>>,
2184}
2185
2186type MetaMemberClient = MetaMemberServiceClient<Channel>;
2187
2188struct MetaMemberGroup {
2189    members: LruCache<http::Uri, Option<MetaMemberClient>>,
2190}
2191
2192struct MetaMemberManagement {
2193    core_ref: Arc<RwLock<GrpcMetaClientCore>>,
2194    members: Either<MetaMemberClient, MetaMemberGroup>,
2195    current_leader: http::Uri,
2196    meta_config: Arc<MetaConfig>,
2197}
2198
2199impl MetaMemberManagement {
2200    const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
2201
2202    fn host_address_to_uri(addr: HostAddress) -> http::Uri {
2203        format!("http://{}:{}", addr.host, addr.port)
2204            .parse()
2205            .unwrap()
2206    }
2207
2208    async fn recreate_core(&self, channel: Channel) {
2209        let mut core = self.core_ref.write().await;
2210        *core = GrpcMetaClientCore::new(channel);
2211    }
2212
2213    async fn refresh_members(&mut self) -> Result<()> {
2214        let leader_addr = match self.members.as_mut() {
2215            Either::Left(client) => {
2216                let resp = client
2217                    .to_owned()
2218                    .members(MembersRequest {})
2219                    .await
2220                    .map_err(RpcError::from_meta_status)?;
2221                let resp = resp.into_inner();
2222                resp.members.into_iter().find(|member| member.is_leader)
2223            }
2224            Either::Right(member_group) => {
2225                let mut fetched_members = None;
2226
2227                for (addr, client) in &mut member_group.members {
2228                    let members: Result<_> = try {
2229                        let mut client = match client {
2230                            Some(cached_client) => cached_client.to_owned(),
2231                            None => {
2232                                let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
2233                                let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
2234                                    .await
2235                                    .context("failed to create client")?;
2236                                let new_client: MetaMemberClient =
2237                                    MetaMemberServiceClient::new(channel);
2238                                *client = Some(new_client.clone());
2239
2240                                new_client
2241                            }
2242                        };
2243
2244                        let resp = client
2245                            .members(MembersRequest {})
2246                            .await
2247                            .context("failed to fetch members")?;
2248
2249                        resp.into_inner().members
2250                    };
2251
2252                    let fetched = members.is_ok();
2253                    fetched_members = Some(members);
2254                    if fetched {
2255                        break;
2256                    }
2257                }
2258
2259                let members = fetched_members
2260                    .context("no member available in the list")?
2261                    .context("could not refresh members")?;
2262
2263                // find new leader
2264                let mut leader = None;
2265                for member in members {
2266                    if member.is_leader {
2267                        leader = Some(member.clone());
2268                    }
2269
2270                    let addr = Self::host_address_to_uri(member.address.unwrap());
2271                    // We don't clean any expired addrs here to deal with some extreme situations.
2272                    if !member_group.members.contains(&addr) {
2273                        tracing::info!("new meta member joined: {}", addr);
2274                        member_group.members.put(addr, None);
2275                    }
2276                }
2277
2278                leader
2279            }
2280        };
2281
2282        if let Some(leader) = leader_addr {
2283            let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
2284
2285            if discovered_leader != self.current_leader {
2286                tracing::info!("new meta leader {} discovered", discovered_leader);
2287
2288                let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
2289                    Duration::from_secs(self.meta_config.meta_leader_lease_secs),
2290                    false,
2291                );
2292
2293                let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
2294                    let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
2295                    GrpcMetaClient::connect_to_endpoint(endpoint).await
2296                })
2297                .await?;
2298
2299                self.recreate_core(channel).await;
2300                self.current_leader = discovered_leader;
2301            }
2302        }
2303
2304        Ok(())
2305    }
2306}
2307
2308impl GrpcMetaClient {
2309    // See `Endpoint::http2_keep_alive_interval`
2310    const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
2311    // See `Endpoint::keep_alive_timeout`
2312    const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
2313    // Retry base interval in ms for connecting to meta server.
2314    const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
2315    // Max retry times for connecting to meta server.
2316    const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
2317
2318    fn start_meta_member_monitor(
2319        &self,
2320        init_leader_addr: http::Uri,
2321        members: Either<MetaMemberClient, MetaMemberGroup>,
2322        force_refresh_receiver: Receiver<Sender<Result<()>>>,
2323        meta_config: Arc<MetaConfig>,
2324    ) -> Result<()> {
2325        let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
2326        let current_leader = init_leader_addr;
2327
2328        let enable_period_tick = matches!(members, Either::Right(_));
2329
2330        let member_management = MetaMemberManagement {
2331            core_ref,
2332            members,
2333            current_leader,
2334            meta_config,
2335        };
2336
2337        let mut force_refresh_receiver = force_refresh_receiver;
2338
2339        tokio::spawn(async move {
2340            let mut member_management = member_management;
2341            let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
2342
2343            loop {
2344                let event: Option<Sender<Result<()>>> = if enable_period_tick {
2345                    tokio::select! {
2346                        _ = ticker.tick() => None,
2347                        result_sender = force_refresh_receiver.recv() => {
2348                            if result_sender.is_none() {
2349                                break;
2350                            }
2351
2352                            result_sender
2353                        },
2354                    }
2355                } else {
2356                    let result_sender = force_refresh_receiver.recv().await;
2357
2358                    if result_sender.is_none() {
2359                        break;
2360                    }
2361
2362                    result_sender
2363                };
2364
2365                let tick_result = member_management.refresh_members().await;
2366                if let Err(e) = tick_result.as_ref() {
2367                    tracing::warn!(error = %e.as_report(),  "refresh meta member client failed");
2368                }
2369
2370                if let Some(sender) = event {
2371                    // ignore resp
2372                    let _resp = sender.send(tick_result);
2373                }
2374            }
2375        });
2376
2377        Ok(())
2378    }
2379
2380    async fn force_refresh_leader(&self) -> Result<()> {
2381        let (sender, receiver) = oneshot::channel();
2382
2383        self.member_monitor_event_sender
2384            .send(sender)
2385            .await
2386            .map_err(|e| anyhow!(e))?;
2387
2388        receiver.await.map_err(|e| anyhow!(e))?
2389    }
2390
2391    /// Connect to the meta server from `addrs`.
2392    pub async fn new(strategy: &MetaAddressStrategy, config: Arc<MetaConfig>) -> Result<Self> {
2393        let (channel, addr) = match strategy {
2394            MetaAddressStrategy::LoadBalance(addr) => {
2395                Self::try_build_rpc_channel(vec![addr.clone()]).await
2396            }
2397            MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
2398        }?;
2399        let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
2400        let client = GrpcMetaClient {
2401            member_monitor_event_sender: force_refresh_sender,
2402            core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
2403        };
2404
2405        let meta_member_client = client.core.read().await.meta_member_client.clone();
2406        let members = match strategy {
2407            MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
2408            MetaAddressStrategy::List(addrs) => {
2409                let mut members = LruCache::new(20.try_into().unwrap());
2410                for addr in addrs {
2411                    members.put(addr.clone(), None);
2412                }
2413                members.put(addr.clone(), Some(meta_member_client));
2414
2415                Either::Right(MetaMemberGroup { members })
2416            }
2417        };
2418
2419        client.start_meta_member_monitor(addr, members, force_refresh_receiver, config.clone())?;
2420
2421        client.force_refresh_leader().await?;
2422
2423        Ok(client)
2424    }
2425
2426    fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2427        Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2428    }
2429
2430    pub(crate) async fn try_build_rpc_channel(
2431        addrs: impl IntoIterator<Item = http::Uri>,
2432    ) -> Result<(Channel, http::Uri)> {
2433        let endpoints: Vec<_> = addrs
2434            .into_iter()
2435            .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2436            .collect();
2437
2438        let mut last_error = None;
2439
2440        for (endpoint, addr) in endpoints {
2441            match Self::connect_to_endpoint(endpoint).await {
2442                Ok(channel) => {
2443                    tracing::info!("Connect to meta server {} successfully", addr);
2444                    return Ok((channel, addr));
2445                }
2446                Err(e) => {
2447                    tracing::warn!(
2448                        error = %e.as_report(),
2449                        "Failed to connect to meta server {}, trying again",
2450                        addr,
2451                    );
2452                    last_error = Some(e);
2453                }
2454            }
2455        }
2456
2457        if let Some(last_error) = last_error {
2458            Err(anyhow::anyhow!(last_error)
2459                .context("failed to connect to all meta servers")
2460                .into())
2461        } else {
2462            bail!("no meta server address provided")
2463        }
2464    }
2465
2466    async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2467        let channel = endpoint
2468            .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2469            .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2470            .connect_timeout(Duration::from_secs(5))
2471            .monitored_connect("grpc-meta-client", Default::default())
2472            .await?
2473            .wrapped();
2474
2475        Ok(channel)
2476    }
2477
2478    pub(crate) fn retry_strategy_to_bound(
2479        high_bound: Duration,
2480        exceed: bool,
2481    ) -> impl Iterator<Item = Duration> {
2482        let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2483            .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2484            .map(jitter);
2485
2486        let mut sum = Duration::default();
2487
2488        iter.take_while(move |duration| {
2489            sum += *duration;
2490
2491            if exceed {
2492                sum < high_bound + *duration
2493            } else {
2494                sum < high_bound
2495            }
2496        })
2497    }
2498}
2499
2500macro_rules! for_all_meta_rpc {
2501    ($macro:ident) => {
2502        $macro! {
2503             { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2504            ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2505            ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2506            ,{ cluster_client, update_worker_node_schedulability, UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse }
2507            ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2508            ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2509            ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2510            ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2511            ,{ stream_client, flush, FlushRequest, FlushResponse }
2512            ,{ stream_client, pause, PauseRequest, PauseResponse }
2513            ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2514            ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2515            ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2516            ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2517            ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2518            ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2519            ,{ stream_client, list_creating_fragment_distribution, ListCreatingFragmentDistributionRequest, ListCreatingFragmentDistributionResponse }
2520            ,{ stream_client, list_sink_log_store_tables, ListSinkLogStoreTablesRequest, ListSinkLogStoreTablesResponse }
2521            ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2522            ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2523            ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2524            ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2525            ,{ stream_client, list_cdc_progress, ListCdcProgressRequest, ListCdcProgressResponse }
2526            ,{ stream_client, list_refresh_table_states, ListRefreshTableStatesRequest, ListRefreshTableStatesResponse }
2527            ,{ stream_client, alter_connector_props, AlterConnectorPropsRequest, AlterConnectorPropsResponse }
2528            ,{ stream_client, get_fragment_by_id, GetFragmentByIdRequest, GetFragmentByIdResponse }
2529            ,{ stream_client, get_fragment_vnodes, GetFragmentVnodesRequest, GetFragmentVnodesResponse }
2530            ,{ stream_client, get_actor_vnodes, GetActorVnodesRequest, GetActorVnodesResponse }
2531            ,{ stream_client, set_sync_log_store_aligned, SetSyncLogStoreAlignedRequest, SetSyncLogStoreAlignedResponse }
2532            ,{ stream_client, refresh, RefreshRequest, RefreshResponse }
2533            ,{ stream_client, list_unmigrated_tables, ListUnmigratedTablesRequest, ListUnmigratedTablesResponse }
2534            ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2535            ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2536            ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2537            ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2538            ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2539            ,{ ddl_client, alter_streaming_job_config, AlterStreamingJobConfigRequest, AlterStreamingJobConfigResponse }
2540            ,{ ddl_client, alter_fragment_parallelism, AlterFragmentParallelismRequest, AlterFragmentParallelismResponse }
2541            ,{ ddl_client, alter_cdc_table_backfill_parallelism, AlterCdcTableBackfillParallelismRequest, AlterCdcTableBackfillParallelismResponse }
2542            ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2543            ,{ ddl_client, alter_database_param, AlterDatabaseParamRequest, AlterDatabaseParamResponse }
2544            ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2545            ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2546            ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2547            ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2548            ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2549            ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2550            ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2551            ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2552            ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2553            ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2554            ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2555            ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2556            ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2557            ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2558            ,{ ddl_client, reset_source, ResetSourceRequest, ResetSourceResponse }
2559            ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2560            ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2561            ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2562            ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2563            ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2564            ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2565            ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2566            ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2567            ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2568            ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2569            ,{ ddl_client, risectl_resume_backfill, RisectlResumeBackfillRequest, RisectlResumeBackfillResponse }
2570            ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2571            ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2572            ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2573            ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2574            ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2575            ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2576            ,{ ddl_client, wait, WaitRequest, WaitResponse }
2577            ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2578            ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2579            ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2580            ,{ ddl_client, compact_iceberg_table, CompactIcebergTableRequest, CompactIcebergTableResponse }
2581            ,{ ddl_client, expire_iceberg_table_snapshots, ExpireIcebergTableSnapshotsRequest, ExpireIcebergTableSnapshotsResponse }
2582            ,{ ddl_client, create_iceberg_table, CreateIcebergTableRequest, CreateIcebergTableResponse }
2583            ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2584            ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2585            ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2586            ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2587            ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2588            ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2589            ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2590            ,{ hummock_client, get_new_object_ids, GetNewObjectIdsRequest, GetNewObjectIdsResponse }
2591            ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2592            ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2593            ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2594            ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2595            ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2596            ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2597            ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2598            ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2599            ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2600            ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2601            ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2602            ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2603            ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2604            ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2605            ,{ hummock_client, subscribe_iceberg_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeIcebergCompactionEventRequest>, Streaming<SubscribeIcebergCompactionEventResponse> }
2606            ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2607            ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2608            ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2609            ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2610            ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2611            ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2612            ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2613            ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2614            ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2615            ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2616            ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2617            ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2618            ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2619            ,{ user_client, alter_default_privilege, AlterDefaultPrivilegeRequest, AlterDefaultPrivilegeResponse }
2620            ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2621            ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2622            ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2623            ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2624            ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2625            ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2626            ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2627            ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2628            ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2629            ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2630            ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2631            ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2632            ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2633            ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2634            ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2635            ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2636            ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2637            ,{ hosted_iceberg_catalog_service_client, list_iceberg_tables, ListIcebergTablesRequest, ListIcebergTablesResponse }
2638            ,{ monitor_client, stack_trace, StackTraceRequest, StackTraceResponse }
2639        }
2640    };
2641}
2642
2643impl GrpcMetaClient {
2644    async fn refresh_client_if_needed(&self, code: Code) {
2645        if matches!(
2646            code,
2647            Code::Unknown | Code::Unimplemented | Code::Unavailable
2648        ) {
2649            tracing::debug!("matching tonic code {}", code);
2650            let (result_sender, result_receiver) = oneshot::channel();
2651            if self
2652                .member_monitor_event_sender
2653                .try_send(result_sender)
2654                .is_ok()
2655            {
2656                if let Ok(Err(e)) = result_receiver.await {
2657                    tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2658                }
2659            } else {
2660                tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2661            }
2662        }
2663    }
2664}
2665
2666impl GrpcMetaClient {
2667    for_all_meta_rpc! { meta_rpc_client_method_impl }
2668}