risingwave_rpc_client/
meta_client.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{
33    AlterDatabaseParam, FunctionId, IndexId, ObjectId, SecretId, TableId,
34};
35use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
36use risingwave_common::hash::WorkerSlotMapping;
37use risingwave_common::id::{
38    ConnectionId, DatabaseId, JobId, SchemaId, SinkId, SubscriptionId, ViewId, WorkerId,
39};
40use risingwave_common::monitor::EndpointExt;
41use risingwave_common::system_param::reader::SystemParamsReader;
42use risingwave_common::telemetry::report::TelemetryInfoFetcher;
43use risingwave_common::util::addr::HostAddr;
44use risingwave_common::util::meta_addr::MetaAddressStrategy;
45use risingwave_common::util::resource_util::cpu::total_cpu_available;
46use risingwave_common::util::resource_util::hostname;
47use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
48use risingwave_error::bail;
49use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
50use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
51use risingwave_hummock_sdk::{
52    CompactionGroupId, HummockEpoch, HummockVersionId, ObjectIdRange, SyncResult,
53};
54use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
55use risingwave_pb::backup_service::*;
56use risingwave_pb::catalog::{
57    Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
58    PbSubscription, PbTable, PbView, Table,
59};
60use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
61use risingwave_pb::cloud_service::*;
62use risingwave_pb::common::worker_node::Property;
63use risingwave_pb::common::{HostAddress, OptionalUint32, OptionalUint64, WorkerNode, WorkerType};
64use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
65use risingwave_pb::ddl_service::alter_owner_request::Object;
66use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
67use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
68use risingwave_pb::ddl_service::*;
69use risingwave_pb::hummock::compact_task::TaskStatus;
70use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
71use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
72use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
73use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
74use risingwave_pb::hummock::write_limits::WriteLimit;
75use risingwave_pb::hummock::*;
76use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Register as IcebergRegister;
77use risingwave_pb::iceberg_compaction::{
78    SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
79    subscribe_iceberg_compaction_event_request,
80};
81use risingwave_pb::id::{ActorId, FragmentId, SourceId};
82use risingwave_pb::meta::alter_connector_props_request::{
83    AlterConnectorPropsObject, AlterIcebergTableIds, ExtraOptions,
84};
85use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
86use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
87use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
88use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
89use risingwave_pb::meta::hosted_iceberg_catalog_service_client::HostedIcebergCatalogServiceClient;
90use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
91use risingwave_pb::meta::list_actor_states_response::ActorState;
92use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
93use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
94use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
95use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
96use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
97use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
98use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
99use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
100use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
101use risingwave_pb::meta::serving_service_client::ServingServiceClient;
102use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
103use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
104use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
105use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
106use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
107use risingwave_pb::meta::{FragmentDistribution, *};
108use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
109use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
110use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
111use risingwave_pb::secret::PbSecretRef;
112use risingwave_pb::stream_plan::StreamFragmentGraph;
113use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
114use risingwave_pb::user::update_user_request::UpdateField;
115use risingwave_pb::user::user_service_client::UserServiceClient;
116use risingwave_pb::user::*;
117use thiserror_ext::AsReport;
118use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
119use tokio::sync::oneshot::Sender;
120use tokio::sync::{RwLock, mpsc, oneshot};
121use tokio::task::JoinHandle;
122use tokio::time::{self};
123use tokio_retry::strategy::{ExponentialBackoff, jitter};
124use tokio_stream::wrappers::UnboundedReceiverStream;
125use tonic::transport::Endpoint;
126use tonic::{Code, Request, Streaming};
127
128use crate::channel::{Channel, WrappedChannelExt};
129use crate::error::{Result, RpcError};
130use crate::hummock_meta_client::{
131    CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
132    IcebergCompactionEventItem,
133};
134use crate::meta_rpc_client_method_impl;
135
136/// Client to meta server. Cloning the instance is lightweight.
137#[derive(Clone, Debug)]
138pub struct MetaClient {
139    worker_id: WorkerId,
140    worker_type: WorkerType,
141    host_addr: HostAddr,
142    inner: GrpcMetaClient,
143    meta_config: Arc<MetaConfig>,
144    cluster_id: String,
145    shutting_down: Arc<AtomicBool>,
146}
147
148impl MetaClient {
149    pub fn worker_id(&self) -> WorkerId {
150        self.worker_id
151    }
152
153    pub fn host_addr(&self) -> &HostAddr {
154        &self.host_addr
155    }
156
157    pub fn worker_type(&self) -> WorkerType {
158        self.worker_type
159    }
160
161    pub fn cluster_id(&self) -> &str {
162        &self.cluster_id
163    }
164
165    /// Subscribe to notification from meta.
166    pub async fn subscribe(
167        &self,
168        subscribe_type: SubscribeType,
169    ) -> Result<Streaming<SubscribeResponse>> {
170        let request = SubscribeRequest {
171            subscribe_type: subscribe_type as i32,
172            host: Some(self.host_addr.to_protobuf()),
173            worker_id: self.worker_id(),
174        };
175
176        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
177            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
178            true,
179        );
180
181        tokio_retry::Retry::spawn(retry_strategy, || async {
182            let request = request.clone();
183            self.inner.subscribe(request).await
184        })
185        .await
186    }
187
188    pub async fn create_connection(
189        &self,
190        connection_name: String,
191        database_id: DatabaseId,
192        schema_id: SchemaId,
193        owner_id: u32,
194        req: create_connection_request::Payload,
195    ) -> Result<WaitVersion> {
196        let request = CreateConnectionRequest {
197            name: connection_name,
198            database_id,
199            schema_id,
200            owner_id,
201            payload: Some(req),
202        };
203        let resp = self.inner.create_connection(request).await?;
204        Ok(resp
205            .version
206            .ok_or_else(|| anyhow!("wait version not set"))?)
207    }
208
209    pub async fn create_secret(
210        &self,
211        secret_name: String,
212        database_id: DatabaseId,
213        schema_id: SchemaId,
214        owner_id: u32,
215        value: Vec<u8>,
216    ) -> Result<WaitVersion> {
217        let request = CreateSecretRequest {
218            name: secret_name,
219            database_id,
220            schema_id,
221            owner_id,
222            value,
223        };
224        let resp = self.inner.create_secret(request).await?;
225        Ok(resp
226            .version
227            .ok_or_else(|| anyhow!("wait version not set"))?)
228    }
229
230    pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
231        let request = ListConnectionsRequest {};
232        let resp = self.inner.list_connections(request).await?;
233        Ok(resp.connections)
234    }
235
236    pub async fn drop_connection(
237        &self,
238        connection_id: ConnectionId,
239        cascade: bool,
240    ) -> Result<WaitVersion> {
241        let request = DropConnectionRequest {
242            connection_id,
243            cascade,
244        };
245        let resp = self.inner.drop_connection(request).await?;
246        Ok(resp
247            .version
248            .ok_or_else(|| anyhow!("wait version not set"))?)
249    }
250
251    pub async fn drop_secret(&self, secret_id: SecretId) -> Result<WaitVersion> {
252        let request = DropSecretRequest { secret_id };
253        let resp = self.inner.drop_secret(request).await?;
254        Ok(resp
255            .version
256            .ok_or_else(|| anyhow!("wait version not set"))?)
257    }
258
259    /// Register the current node to the cluster and set the corresponding worker id.
260    ///
261    /// Retry if there's connection issue with the meta node. Exit the process if the registration fails.
262    pub async fn register_new(
263        addr_strategy: MetaAddressStrategy,
264        worker_type: WorkerType,
265        addr: &HostAddr,
266        property: Property,
267        meta_config: Arc<MetaConfig>,
268    ) -> (Self, SystemParamsReader) {
269        let ret =
270            Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
271
272        match ret {
273            Ok(ret) => ret,
274            Err(err) => {
275                tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
276                std::process::exit(1);
277            }
278        }
279    }
280
281    async fn register_new_inner(
282        addr_strategy: MetaAddressStrategy,
283        worker_type: WorkerType,
284        addr: &HostAddr,
285        property: Property,
286        meta_config: Arc<MetaConfig>,
287    ) -> Result<(Self, SystemParamsReader)> {
288        tracing::info!("register meta client using strategy: {}", addr_strategy);
289
290        // Retry until reaching `max_heartbeat_interval_secs`
291        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
292            Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
293            true,
294        );
295
296        if property.is_unschedulable {
297            tracing::warn!("worker {:?} registered as unschedulable", addr.clone());
298        }
299        let init_result: Result<_> = tokio_retry::RetryIf::spawn(
300            retry_strategy,
301            || async {
302                let grpc_meta_client =
303                    GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
304
305                let add_worker_resp = grpc_meta_client
306                    .add_worker_node(AddWorkerNodeRequest {
307                        worker_type: worker_type as i32,
308                        host: Some(addr.to_protobuf()),
309                        property: Some(property.clone()),
310                        resource: Some(risingwave_pb::common::worker_node::Resource {
311                            rw_version: RW_VERSION.to_owned(),
312                            total_memory_bytes: system_memory_available_bytes() as _,
313                            total_cpu_cores: total_cpu_available() as _,
314                            hostname: hostname(),
315                        }),
316                    })
317                    .await
318                    .context("failed to add worker node")?;
319
320                let system_params_resp = grpc_meta_client
321                    .get_system_params(GetSystemParamsRequest {})
322                    .await
323                    .context("failed to get initial system params")?;
324
325                Ok((add_worker_resp, system_params_resp, grpc_meta_client))
326            },
327            // Only retry if there's any transient connection issue.
328            // If the error is from our implementation or business, do not retry it.
329            |e: &RpcError| !e.is_from_tonic_server_impl(),
330        )
331        .await;
332
333        let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
334        let worker_id = add_worker_resp
335            .node_id
336            .expect("AddWorkerNodeResponse::node_id is empty");
337
338        let meta_client = Self {
339            worker_id,
340            worker_type,
341            host_addr: addr.clone(),
342            inner: grpc_meta_client,
343            meta_config: meta_config.clone(),
344            cluster_id: add_worker_resp.cluster_id,
345            shutting_down: Arc::new(false.into()),
346        };
347
348        static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
349        REPORT_PANIC.call_once(|| {
350            let meta_client_clone = meta_client.clone();
351            std::panic::update_hook(move |default_hook, info| {
352                // Try to report panic event to meta node.
353                meta_client_clone.try_add_panic_event_blocking(info, None);
354                default_hook(info);
355            });
356        });
357
358        Ok((meta_client, system_params_resp.params.unwrap().into()))
359    }
360
361    /// Activate the current node in cluster to confirm it's ready to serve.
362    pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
363        let request = ActivateWorkerNodeRequest {
364            host: Some(addr.to_protobuf()),
365            node_id: self.worker_id,
366        };
367        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
368            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
369            true,
370        );
371        tokio_retry::Retry::spawn(retry_strategy, || async {
372            let request = request.clone();
373            self.inner.activate_worker_node(request).await
374        })
375        .await?;
376
377        Ok(())
378    }
379
380    /// Send heartbeat signal to meta service.
381    pub async fn send_heartbeat(&self, node_id: WorkerId) -> Result<()> {
382        let request = HeartbeatRequest { node_id };
383        let resp = self.inner.heartbeat(request).await?;
384        if let Some(status) = resp.status
385            && status.code() == risingwave_pb::common::status::Code::UnknownWorker
386        {
387            // Ignore the error if we're already shutting down.
388            // Otherwise, exit the process.
389            if !self.shutting_down.load(Relaxed) {
390                tracing::error!(message = status.message, "worker expired");
391                std::process::exit(1);
392            }
393        }
394        Ok(())
395    }
396
397    pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
398        let request = CreateDatabaseRequest { db: Some(db) };
399        let resp = self.inner.create_database(request).await?;
400        // TODO: handle error in `resp.status` here
401        Ok(resp
402            .version
403            .ok_or_else(|| anyhow!("wait version not set"))?)
404    }
405
406    pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
407        let request = CreateSchemaRequest {
408            schema: Some(schema),
409        };
410        let resp = self.inner.create_schema(request).await?;
411        // TODO: handle error in `resp.status` here
412        Ok(resp
413            .version
414            .ok_or_else(|| anyhow!("wait version not set"))?)
415    }
416
417    pub async fn create_materialized_view(
418        &self,
419        table: PbTable,
420        graph: StreamFragmentGraph,
421        dependencies: HashSet<ObjectId>,
422        resource_type: streaming_job_resource_type::ResourceType,
423        if_not_exists: bool,
424    ) -> Result<WaitVersion> {
425        let request = CreateMaterializedViewRequest {
426            materialized_view: Some(table),
427            fragment_graph: Some(graph),
428            resource_type: Some(PbStreamingJobResourceType {
429                resource_type: Some(resource_type),
430            }),
431            dependencies: dependencies.into_iter().collect(),
432            if_not_exists,
433        };
434        let resp = self.inner.create_materialized_view(request).await?;
435        // TODO: handle error in `resp.status` here
436        Ok(resp
437            .version
438            .ok_or_else(|| anyhow!("wait version not set"))?)
439    }
440
441    pub async fn drop_materialized_view(
442        &self,
443        table_id: TableId,
444        cascade: bool,
445    ) -> Result<WaitVersion> {
446        let request = DropMaterializedViewRequest { table_id, cascade };
447
448        let resp = self.inner.drop_materialized_view(request).await?;
449        Ok(resp
450            .version
451            .ok_or_else(|| anyhow!("wait version not set"))?)
452    }
453
454    pub async fn create_source(
455        &self,
456        source: PbSource,
457        graph: Option<StreamFragmentGraph>,
458        if_not_exists: bool,
459    ) -> Result<WaitVersion> {
460        let request = CreateSourceRequest {
461            source: Some(source),
462            fragment_graph: graph,
463            if_not_exists,
464        };
465
466        let resp = self.inner.create_source(request).await?;
467        Ok(resp
468            .version
469            .ok_or_else(|| anyhow!("wait version not set"))?)
470    }
471
472    pub async fn create_sink(
473        &self,
474        sink: PbSink,
475        graph: StreamFragmentGraph,
476        dependencies: HashSet<ObjectId>,
477        if_not_exists: bool,
478    ) -> Result<WaitVersion> {
479        let request = CreateSinkRequest {
480            sink: Some(sink),
481            fragment_graph: Some(graph),
482            dependencies: dependencies.into_iter().collect(),
483            if_not_exists,
484        };
485
486        let resp = self.inner.create_sink(request).await?;
487        Ok(resp
488            .version
489            .ok_or_else(|| anyhow!("wait version not set"))?)
490    }
491
492    pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
493        let request = CreateSubscriptionRequest {
494            subscription: Some(subscription),
495        };
496
497        let resp = self.inner.create_subscription(request).await?;
498        Ok(resp
499            .version
500            .ok_or_else(|| anyhow!("wait version not set"))?)
501    }
502
503    pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
504        let request = CreateFunctionRequest {
505            function: Some(function),
506        };
507        let resp = self.inner.create_function(request).await?;
508        Ok(resp
509            .version
510            .ok_or_else(|| anyhow!("wait version not set"))?)
511    }
512
513    pub async fn create_table(
514        &self,
515        source: Option<PbSource>,
516        table: PbTable,
517        graph: StreamFragmentGraph,
518        job_type: PbTableJobType,
519        if_not_exists: bool,
520        dependencies: HashSet<ObjectId>,
521    ) -> Result<WaitVersion> {
522        let request = CreateTableRequest {
523            materialized_view: Some(table),
524            fragment_graph: Some(graph),
525            source,
526            job_type: job_type as _,
527            if_not_exists,
528            dependencies: dependencies.into_iter().collect(),
529        };
530        let resp = self.inner.create_table(request).await?;
531        // TODO: handle error in `resp.status` here
532        Ok(resp
533            .version
534            .ok_or_else(|| anyhow!("wait version not set"))?)
535    }
536
537    pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
538        let request = CommentOnRequest {
539            comment: Some(comment),
540        };
541        let resp = self.inner.comment_on(request).await?;
542        Ok(resp
543            .version
544            .ok_or_else(|| anyhow!("wait version not set"))?)
545    }
546
547    pub async fn alter_name(
548        &self,
549        object: alter_name_request::Object,
550        name: &str,
551    ) -> Result<WaitVersion> {
552        let request = AlterNameRequest {
553            object: Some(object),
554            new_name: name.to_owned(),
555        };
556        let resp = self.inner.alter_name(request).await?;
557        Ok(resp
558            .version
559            .ok_or_else(|| anyhow!("wait version not set"))?)
560    }
561
562    pub async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<WaitVersion> {
563        let request = AlterOwnerRequest {
564            object: Some(object),
565            owner_id,
566        };
567        let resp = self.inner.alter_owner(request).await?;
568        Ok(resp
569            .version
570            .ok_or_else(|| anyhow!("wait version not set"))?)
571    }
572
573    pub async fn alter_database_param(
574        &self,
575        database_id: DatabaseId,
576        param: AlterDatabaseParam,
577    ) -> Result<WaitVersion> {
578        let request = match param {
579            AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms) => {
580                let barrier_interval_ms = OptionalUint32 {
581                    value: barrier_interval_ms,
582                };
583                AlterDatabaseParamRequest {
584                    database_id,
585                    param: Some(alter_database_param_request::Param::BarrierIntervalMs(
586                        barrier_interval_ms,
587                    )),
588                }
589            }
590            AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency) => {
591                let checkpoint_frequency = OptionalUint64 {
592                    value: checkpoint_frequency,
593                };
594                AlterDatabaseParamRequest {
595                    database_id,
596                    param: Some(alter_database_param_request::Param::CheckpointFrequency(
597                        checkpoint_frequency,
598                    )),
599                }
600            }
601        };
602        let resp = self.inner.alter_database_param(request).await?;
603        Ok(resp
604            .version
605            .ok_or_else(|| anyhow!("wait version not set"))?)
606    }
607
608    pub async fn alter_set_schema(
609        &self,
610        object: alter_set_schema_request::Object,
611        new_schema_id: SchemaId,
612    ) -> Result<WaitVersion> {
613        let request = AlterSetSchemaRequest {
614            new_schema_id,
615            object: Some(object),
616        };
617        let resp = self.inner.alter_set_schema(request).await?;
618        Ok(resp
619            .version
620            .ok_or_else(|| anyhow!("wait version not set"))?)
621    }
622
623    pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
624        let request = AlterSourceRequest {
625            source: Some(source),
626        };
627        let resp = self.inner.alter_source(request).await?;
628        Ok(resp
629            .version
630            .ok_or_else(|| anyhow!("wait version not set"))?)
631    }
632
633    pub async fn alter_parallelism(
634        &self,
635        job_id: JobId,
636        parallelism: PbTableParallelism,
637        deferred: bool,
638    ) -> Result<()> {
639        let request = AlterParallelismRequest {
640            table_id: job_id,
641            parallelism: Some(parallelism),
642            deferred,
643        };
644
645        self.inner.alter_parallelism(request).await?;
646        Ok(())
647    }
648
649    pub async fn alter_streaming_job_config(
650        &self,
651        job_id: JobId,
652        entries_to_add: HashMap<String, String>,
653        keys_to_remove: Vec<String>,
654    ) -> Result<()> {
655        let request = AlterStreamingJobConfigRequest {
656            job_id,
657            entries_to_add,
658            keys_to_remove,
659        };
660
661        self.inner.alter_streaming_job_config(request).await?;
662        Ok(())
663    }
664
665    pub async fn alter_fragment_parallelism(
666        &self,
667        fragment_ids: Vec<FragmentId>,
668        parallelism: Option<PbTableParallelism>,
669    ) -> Result<()> {
670        let request = AlterFragmentParallelismRequest {
671            fragment_ids,
672            parallelism,
673        };
674
675        self.inner.alter_fragment_parallelism(request).await?;
676        Ok(())
677    }
678
679    pub async fn alter_cdc_table_backfill_parallelism(
680        &self,
681        table_id: JobId,
682        parallelism: PbTableParallelism,
683    ) -> Result<()> {
684        let request = AlterCdcTableBackfillParallelismRequest {
685            table_id,
686            parallelism: Some(parallelism),
687        };
688        self.inner
689            .alter_cdc_table_backfill_parallelism(request)
690            .await?;
691        Ok(())
692    }
693
694    pub async fn alter_resource_group(
695        &self,
696        table_id: TableId,
697        resource_group: Option<String>,
698        deferred: bool,
699    ) -> Result<()> {
700        let request = AlterResourceGroupRequest {
701            table_id,
702            resource_group,
703            deferred,
704        };
705
706        self.inner.alter_resource_group(request).await?;
707        Ok(())
708    }
709
710    pub async fn alter_swap_rename(
711        &self,
712        object: alter_swap_rename_request::Object,
713    ) -> Result<WaitVersion> {
714        let request = AlterSwapRenameRequest {
715            object: Some(object),
716        };
717        let resp = self.inner.alter_swap_rename(request).await?;
718        Ok(resp
719            .version
720            .ok_or_else(|| anyhow!("wait version not set"))?)
721    }
722
723    pub async fn alter_secret(
724        &self,
725        secret_id: SecretId,
726        secret_name: String,
727        database_id: DatabaseId,
728        schema_id: SchemaId,
729        owner_id: u32,
730        value: Vec<u8>,
731    ) -> Result<WaitVersion> {
732        let request = AlterSecretRequest {
733            secret_id,
734            name: secret_name,
735            database_id,
736            schema_id,
737            owner_id,
738            value,
739        };
740        let resp = self.inner.alter_secret(request).await?;
741        Ok(resp
742            .version
743            .ok_or_else(|| anyhow!("wait version not set"))?)
744    }
745
746    pub async fn replace_job(
747        &self,
748        graph: StreamFragmentGraph,
749        replace_job: ReplaceJob,
750    ) -> Result<WaitVersion> {
751        let request = ReplaceJobPlanRequest {
752            plan: Some(ReplaceJobPlan {
753                fragment_graph: Some(graph),
754                replace_job: Some(replace_job),
755            }),
756        };
757        let resp = self.inner.replace_job_plan(request).await?;
758        // TODO: handle error in `resp.status` here
759        Ok(resp
760            .version
761            .ok_or_else(|| anyhow!("wait version not set"))?)
762    }
763
764    pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
765        let request = AutoSchemaChangeRequest {
766            schema_change: Some(schema_change),
767        };
768        let _ = self.inner.auto_schema_change(request).await?;
769        Ok(())
770    }
771
772    pub async fn create_view(
773        &self,
774        view: PbView,
775        dependencies: HashSet<ObjectId>,
776    ) -> Result<WaitVersion> {
777        let request = CreateViewRequest {
778            view: Some(view),
779            dependencies: dependencies.into_iter().collect(),
780        };
781        let resp = self.inner.create_view(request).await?;
782        // TODO: handle error in `resp.status` here
783        Ok(resp
784            .version
785            .ok_or_else(|| anyhow!("wait version not set"))?)
786    }
787
788    pub async fn create_index(
789        &self,
790        index: PbIndex,
791        table: PbTable,
792        graph: StreamFragmentGraph,
793        if_not_exists: bool,
794    ) -> Result<WaitVersion> {
795        let request = CreateIndexRequest {
796            index: Some(index),
797            index_table: Some(table),
798            fragment_graph: Some(graph),
799            if_not_exists,
800        };
801        let resp = self.inner.create_index(request).await?;
802        // TODO: handle error in `resp.status` here
803        Ok(resp
804            .version
805            .ok_or_else(|| anyhow!("wait version not set"))?)
806    }
807
808    pub async fn drop_table(
809        &self,
810        source_id: Option<u32>,
811        table_id: TableId,
812        cascade: bool,
813    ) -> Result<WaitVersion> {
814        let request = DropTableRequest {
815            source_id: source_id.map(risingwave_pb::ddl_service::drop_table_request::SourceId::Id),
816            table_id,
817            cascade,
818        };
819
820        let resp = self.inner.drop_table(request).await?;
821        Ok(resp
822            .version
823            .ok_or_else(|| anyhow!("wait version not set"))?)
824    }
825
826    pub async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
827        let request = CompactIcebergTableRequest { sink_id };
828        let resp = self.inner.compact_iceberg_table(request).await?;
829        Ok(resp.task_id)
830    }
831
832    pub async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
833        let request = ExpireIcebergTableSnapshotsRequest { sink_id };
834        let _resp = self.inner.expire_iceberg_table_snapshots(request).await?;
835        Ok(())
836    }
837
838    pub async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<WaitVersion> {
839        let request = DropViewRequest { view_id, cascade };
840        let resp = self.inner.drop_view(request).await?;
841        Ok(resp
842            .version
843            .ok_or_else(|| anyhow!("wait version not set"))?)
844    }
845
846    pub async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<WaitVersion> {
847        let request = DropSourceRequest { source_id, cascade };
848        let resp = self.inner.drop_source(request).await?;
849        Ok(resp
850            .version
851            .ok_or_else(|| anyhow!("wait version not set"))?)
852    }
853
854    pub async fn reset_source(&self, source_id: SourceId) -> Result<WaitVersion> {
855        let request = ResetSourceRequest { source_id };
856        let resp = self.inner.reset_source(request).await?;
857        Ok(resp
858            .version
859            .ok_or_else(|| anyhow!("wait version not set"))?)
860    }
861
862    pub async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<WaitVersion> {
863        let request = DropSinkRequest { sink_id, cascade };
864        let resp = self.inner.drop_sink(request).await?;
865        Ok(resp
866            .version
867            .ok_or_else(|| anyhow!("wait version not set"))?)
868    }
869
870    pub async fn drop_subscription(
871        &self,
872        subscription_id: SubscriptionId,
873        cascade: bool,
874    ) -> Result<WaitVersion> {
875        let request = DropSubscriptionRequest {
876            subscription_id,
877            cascade,
878        };
879        let resp = self.inner.drop_subscription(request).await?;
880        Ok(resp
881            .version
882            .ok_or_else(|| anyhow!("wait version not set"))?)
883    }
884
885    pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
886        let request = DropIndexRequest { index_id, cascade };
887        let resp = self.inner.drop_index(request).await?;
888        Ok(resp
889            .version
890            .ok_or_else(|| anyhow!("wait version not set"))?)
891    }
892
893    pub async fn drop_function(
894        &self,
895        function_id: FunctionId,
896        cascade: bool,
897    ) -> Result<WaitVersion> {
898        let request = DropFunctionRequest {
899            function_id,
900            cascade,
901        };
902        let resp = self.inner.drop_function(request).await?;
903        Ok(resp
904            .version
905            .ok_or_else(|| anyhow!("wait version not set"))?)
906    }
907
908    pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
909        let request = DropDatabaseRequest { database_id };
910        let resp = self.inner.drop_database(request).await?;
911        Ok(resp
912            .version
913            .ok_or_else(|| anyhow!("wait version not set"))?)
914    }
915
916    pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
917        let request = DropSchemaRequest { schema_id, cascade };
918        let resp = self.inner.drop_schema(request).await?;
919        Ok(resp
920            .version
921            .ok_or_else(|| anyhow!("wait version not set"))?)
922    }
923
924    // TODO: using UserInfoVersion instead as return type.
925    pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
926        let request = CreateUserRequest { user: Some(user) };
927        let resp = self.inner.create_user(request).await?;
928        Ok(resp.version)
929    }
930
931    pub async fn drop_user(&self, user_id: u32) -> Result<u64> {
932        let request = DropUserRequest { user_id };
933        let resp = self.inner.drop_user(request).await?;
934        Ok(resp.version)
935    }
936
937    pub async fn update_user(
938        &self,
939        user: UserInfo,
940        update_fields: Vec<UpdateField>,
941    ) -> Result<u64> {
942        let request = UpdateUserRequest {
943            user: Some(user),
944            update_fields: update_fields
945                .into_iter()
946                .map(|field| field as i32)
947                .collect::<Vec<_>>(),
948        };
949        let resp = self.inner.update_user(request).await?;
950        Ok(resp.version)
951    }
952
953    pub async fn grant_privilege(
954        &self,
955        user_ids: Vec<u32>,
956        privileges: Vec<GrantPrivilege>,
957        with_grant_option: bool,
958        granted_by: u32,
959    ) -> Result<u64> {
960        let request = GrantPrivilegeRequest {
961            user_ids,
962            privileges,
963            with_grant_option,
964            granted_by,
965        };
966        let resp = self.inner.grant_privilege(request).await?;
967        Ok(resp.version)
968    }
969
970    pub async fn revoke_privilege(
971        &self,
972        user_ids: Vec<u32>,
973        privileges: Vec<GrantPrivilege>,
974        granted_by: u32,
975        revoke_by: u32,
976        revoke_grant_option: bool,
977        cascade: bool,
978    ) -> Result<u64> {
979        let request = RevokePrivilegeRequest {
980            user_ids,
981            privileges,
982            granted_by,
983            revoke_by,
984            revoke_grant_option,
985            cascade,
986        };
987        let resp = self.inner.revoke_privilege(request).await?;
988        Ok(resp.version)
989    }
990
991    pub async fn alter_default_privilege(
992        &self,
993        user_ids: Vec<u32>,
994        database_id: DatabaseId,
995        schema_ids: Vec<SchemaId>,
996        operation: AlterDefaultPrivilegeOperation,
997        granted_by: u32,
998    ) -> Result<()> {
999        let request = AlterDefaultPrivilegeRequest {
1000            user_ids,
1001            database_id,
1002            schema_ids,
1003            operation: Some(operation),
1004            granted_by,
1005        };
1006        self.inner.alter_default_privilege(request).await?;
1007        Ok(())
1008    }
1009
1010    /// Unregister the current node from the cluster.
1011    pub async fn unregister(&self) -> Result<()> {
1012        let request = DeleteWorkerNodeRequest {
1013            host: Some(self.host_addr.to_protobuf()),
1014        };
1015        self.inner.delete_worker_node(request).await?;
1016        self.shutting_down.store(true, Relaxed);
1017        Ok(())
1018    }
1019
1020    /// Try to unregister the current worker from the cluster with best effort. Log the result.
1021    pub async fn try_unregister(&self) {
1022        match self.unregister().await {
1023            Ok(_) => {
1024                tracing::info!(
1025                    worker_id = %self.worker_id(),
1026                    "successfully unregistered from meta service",
1027                )
1028            }
1029            Err(e) => {
1030                tracing::warn!(
1031                    error = %e.as_report(),
1032                    worker_id = %self.worker_id(),
1033                    "failed to unregister from meta service",
1034                );
1035            }
1036        }
1037    }
1038
1039    pub async fn update_schedulability(
1040        &self,
1041        worker_ids: &[WorkerId],
1042        schedulability: Schedulability,
1043    ) -> Result<UpdateWorkerNodeSchedulabilityResponse> {
1044        let request = UpdateWorkerNodeSchedulabilityRequest {
1045            worker_ids: worker_ids.to_vec(),
1046            schedulability: schedulability.into(),
1047        };
1048        let resp = self
1049            .inner
1050            .update_worker_node_schedulability(request)
1051            .await?;
1052        Ok(resp)
1053    }
1054
1055    pub async fn list_worker_nodes(
1056        &self,
1057        worker_type: Option<WorkerType>,
1058    ) -> Result<Vec<WorkerNode>> {
1059        let request = ListAllNodesRequest {
1060            worker_type: worker_type.map(Into::into),
1061            include_starting_nodes: true,
1062        };
1063        let resp = self.inner.list_all_nodes(request).await?;
1064        Ok(resp.nodes)
1065    }
1066
1067    /// Starts a heartbeat worker.
1068    pub fn start_heartbeat_loop(
1069        meta_client: MetaClient,
1070        min_interval: Duration,
1071    ) -> (JoinHandle<()>, Sender<()>) {
1072        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1073        let join_handle = tokio::spawn(async move {
1074            let mut min_interval_ticker = tokio::time::interval(min_interval);
1075            loop {
1076                tokio::select! {
1077                    biased;
1078                    // Shutdown
1079                    _ = &mut shutdown_rx => {
1080                        tracing::info!("Heartbeat loop is stopped");
1081                        return;
1082                    }
1083                    // Wait for interval
1084                    _ = min_interval_ticker.tick() => {},
1085                }
1086                tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
1087                match tokio::time::timeout(
1088                    // TODO: decide better min_interval for timeout
1089                    min_interval * 3,
1090                    meta_client.send_heartbeat(meta_client.worker_id()),
1091                )
1092                .await
1093                {
1094                    Ok(Ok(_)) => {}
1095                    Ok(Err(err)) => {
1096                        tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
1097                    }
1098                    Err(_) => {
1099                        tracing::warn!("Failed to send_heartbeat: timeout");
1100                    }
1101                }
1102            }
1103        });
1104        (join_handle, shutdown_tx)
1105    }
1106
1107    pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
1108        let request = RisectlListStateTablesRequest {};
1109        let resp = self.inner.risectl_list_state_tables(request).await?;
1110        Ok(resp.tables)
1111    }
1112
1113    pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
1114        let request = FlushRequest { database_id };
1115        let resp = self.inner.flush(request).await?;
1116        Ok(HummockVersionId::new(resp.hummock_version_id))
1117    }
1118
1119    pub async fn wait(&self) -> Result<()> {
1120        let request = WaitRequest {};
1121        self.inner.wait(request).await?;
1122        Ok(())
1123    }
1124
1125    pub async fn recover(&self) -> Result<()> {
1126        let request = RecoverRequest {};
1127        self.inner.recover(request).await?;
1128        Ok(())
1129    }
1130
1131    pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
1132        let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
1133        let resp = self.inner.cancel_creating_jobs(request).await?;
1134        Ok(resp.canceled_jobs)
1135    }
1136
1137    pub async fn list_table_fragments(
1138        &self,
1139        job_ids: &[JobId],
1140    ) -> Result<HashMap<JobId, TableFragmentInfo>> {
1141        let request = ListTableFragmentsRequest {
1142            table_ids: job_ids.to_vec(),
1143        };
1144        let resp = self.inner.list_table_fragments(request).await?;
1145        Ok(resp.table_fragments)
1146    }
1147
1148    pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1149        let resp = self
1150            .inner
1151            .list_streaming_job_states(ListStreamingJobStatesRequest {})
1152            .await?;
1153        Ok(resp.states)
1154    }
1155
1156    pub async fn list_fragment_distributions(&self) -> Result<Vec<FragmentDistribution>> {
1157        let resp = self
1158            .inner
1159            .list_fragment_distribution(ListFragmentDistributionRequest {})
1160            .await?;
1161        Ok(resp.distributions)
1162    }
1163
1164    pub async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
1165        let resp = self
1166            .inner
1167            .list_creating_fragment_distribution(ListCreatingFragmentDistributionRequest {})
1168            .await?;
1169        Ok(resp.distributions)
1170    }
1171
1172    pub async fn get_fragment_by_id(
1173        &self,
1174        fragment_id: FragmentId,
1175    ) -> Result<Option<FragmentDistribution>> {
1176        let resp = self
1177            .inner
1178            .get_fragment_by_id(GetFragmentByIdRequest { fragment_id })
1179            .await?;
1180        Ok(resp.distribution)
1181    }
1182
1183    pub async fn get_fragment_vnodes(
1184        &self,
1185        fragment_id: FragmentId,
1186    ) -> Result<Vec<(ActorId, Vec<u32>)>> {
1187        let resp = self
1188            .inner
1189            .get_fragment_vnodes(GetFragmentVnodesRequest { fragment_id })
1190            .await?;
1191        Ok(resp
1192            .actor_vnodes
1193            .into_iter()
1194            .map(|actor| (actor.actor_id, actor.vnode_indices))
1195            .collect())
1196    }
1197
1198    pub async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>> {
1199        let resp = self
1200            .inner
1201            .get_actor_vnodes(GetActorVnodesRequest { actor_id })
1202            .await?;
1203        Ok(resp.vnode_indices)
1204    }
1205
1206    pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1207        let resp = self
1208            .inner
1209            .list_actor_states(ListActorStatesRequest {})
1210            .await?;
1211        Ok(resp.states)
1212    }
1213
1214    pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1215        let resp = self
1216            .inner
1217            .list_actor_splits(ListActorSplitsRequest {})
1218            .await?;
1219
1220        Ok(resp.actor_splits)
1221    }
1222
1223    pub async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
1224        let resp = self
1225            .inner
1226            .list_object_dependencies(ListObjectDependenciesRequest {})
1227            .await?;
1228        Ok(resp.dependencies)
1229    }
1230
1231    pub async fn pause(&self) -> Result<PauseResponse> {
1232        let request = PauseRequest {};
1233        let resp = self.inner.pause(request).await?;
1234        Ok(resp)
1235    }
1236
1237    pub async fn resume(&self) -> Result<ResumeResponse> {
1238        let request = ResumeRequest {};
1239        let resp = self.inner.resume(request).await?;
1240        Ok(resp)
1241    }
1242
1243    pub async fn apply_throttle(
1244        &self,
1245        kind: PbThrottleTarget,
1246        id: u32,
1247        rate: Option<u32>,
1248    ) -> Result<ApplyThrottleResponse> {
1249        let request = ApplyThrottleRequest {
1250            kind: kind as i32,
1251            id,
1252            rate,
1253        };
1254        let resp = self.inner.apply_throttle(request).await?;
1255        Ok(resp)
1256    }
1257
1258    pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1259        let resp = self
1260            .inner
1261            .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1262            .await?;
1263        Ok(resp.get_status().unwrap())
1264    }
1265
1266    pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1267        let request = GetClusterInfoRequest {};
1268        let resp = self.inner.get_cluster_info(request).await?;
1269        Ok(resp)
1270    }
1271
1272    pub async fn reschedule(
1273        &self,
1274        worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1275        revision: u64,
1276        resolve_no_shuffle_upstream: bool,
1277    ) -> Result<(bool, u64)> {
1278        let request = RescheduleRequest {
1279            revision,
1280            resolve_no_shuffle_upstream,
1281            worker_reschedules,
1282        };
1283        let resp = self.inner.reschedule(request).await?;
1284        Ok((resp.success, resp.revision))
1285    }
1286
1287    pub async fn risectl_get_pinned_versions_summary(
1288        &self,
1289    ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1290        let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1291        self.inner
1292            .rise_ctl_get_pinned_versions_summary(request)
1293            .await
1294    }
1295
1296    pub async fn risectl_get_checkpoint_hummock_version(
1297        &self,
1298    ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1299        let request = RiseCtlGetCheckpointVersionRequest {};
1300        self.inner.rise_ctl_get_checkpoint_version(request).await
1301    }
1302
1303    pub async fn risectl_pause_hummock_version_checkpoint(
1304        &self,
1305    ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1306        let request = RiseCtlPauseVersionCheckpointRequest {};
1307        self.inner.rise_ctl_pause_version_checkpoint(request).await
1308    }
1309
1310    pub async fn risectl_resume_hummock_version_checkpoint(
1311        &self,
1312    ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1313        let request = RiseCtlResumeVersionCheckpointRequest {};
1314        self.inner.rise_ctl_resume_version_checkpoint(request).await
1315    }
1316
1317    pub async fn init_metadata_for_replay(
1318        &self,
1319        tables: Vec<PbTable>,
1320        compaction_groups: Vec<CompactionGroupInfo>,
1321    ) -> Result<()> {
1322        let req = InitMetadataForReplayRequest {
1323            tables,
1324            compaction_groups,
1325        };
1326        let _resp = self.inner.init_metadata_for_replay(req).await?;
1327        Ok(())
1328    }
1329
1330    pub async fn replay_version_delta(
1331        &self,
1332        version_delta: HummockVersionDelta,
1333    ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1334        let req = ReplayVersionDeltaRequest {
1335            version_delta: Some(version_delta.into()),
1336        };
1337        let resp = self.inner.replay_version_delta(req).await?;
1338        Ok((
1339            HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1340            resp.modified_compaction_groups,
1341        ))
1342    }
1343
1344    pub async fn list_version_deltas(
1345        &self,
1346        start_id: HummockVersionId,
1347        num_limit: u32,
1348        committed_epoch_limit: HummockEpoch,
1349    ) -> Result<Vec<HummockVersionDelta>> {
1350        let req = ListVersionDeltasRequest {
1351            start_id: start_id.to_u64(),
1352            num_limit,
1353            committed_epoch_limit,
1354        };
1355        Ok(self
1356            .inner
1357            .list_version_deltas(req)
1358            .await?
1359            .version_deltas
1360            .unwrap()
1361            .version_deltas
1362            .iter()
1363            .map(HummockVersionDelta::from_rpc_protobuf)
1364            .collect())
1365    }
1366
1367    pub async fn trigger_compaction_deterministic(
1368        &self,
1369        version_id: HummockVersionId,
1370        compaction_groups: Vec<CompactionGroupId>,
1371    ) -> Result<()> {
1372        let req = TriggerCompactionDeterministicRequest {
1373            version_id: version_id.to_u64(),
1374            compaction_groups,
1375        };
1376        self.inner.trigger_compaction_deterministic(req).await?;
1377        Ok(())
1378    }
1379
1380    pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1381        let req = DisableCommitEpochRequest {};
1382        Ok(HummockVersion::from_rpc_protobuf(
1383            &self
1384                .inner
1385                .disable_commit_epoch(req)
1386                .await?
1387                .current_version
1388                .unwrap(),
1389        ))
1390    }
1391
1392    pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1393        let req = GetAssignedCompactTaskNumRequest {};
1394        let resp = self.inner.get_assigned_compact_task_num(req).await?;
1395        Ok(resp.num_tasks as usize)
1396    }
1397
1398    pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1399        let req = RiseCtlListCompactionGroupRequest {};
1400        let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1401        Ok(resp.compaction_groups)
1402    }
1403
1404    pub async fn risectl_update_compaction_config(
1405        &self,
1406        compaction_groups: &[CompactionGroupId],
1407        configs: &[MutableConfig],
1408    ) -> Result<()> {
1409        let req = RiseCtlUpdateCompactionConfigRequest {
1410            compaction_group_ids: compaction_groups.to_vec(),
1411            configs: configs
1412                .iter()
1413                .map(
1414                    |c| rise_ctl_update_compaction_config_request::MutableConfig {
1415                        mutable_config: Some(c.clone()),
1416                    },
1417                )
1418                .collect(),
1419        };
1420        let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1421        Ok(())
1422    }
1423
1424    pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1425        let req = BackupMetaRequest { remarks };
1426        let resp = self.inner.backup_meta(req).await?;
1427        Ok(resp.job_id)
1428    }
1429
1430    pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1431        let req = GetBackupJobStatusRequest { job_id };
1432        let resp = self.inner.get_backup_job_status(req).await?;
1433        Ok((resp.job_status(), resp.message))
1434    }
1435
1436    pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1437        let req = DeleteMetaSnapshotRequest {
1438            snapshot_ids: snapshot_ids.to_vec(),
1439        };
1440        let _resp = self.inner.delete_meta_snapshot(req).await?;
1441        Ok(())
1442    }
1443
1444    pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1445        let req = GetMetaSnapshotManifestRequest {};
1446        let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1447        Ok(resp.manifest.expect("should exist"))
1448    }
1449
1450    pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1451        let req = GetTelemetryInfoRequest {};
1452        let resp = self.inner.get_telemetry_info(req).await?;
1453        Ok(resp)
1454    }
1455
1456    pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1457        let req = GetMetaStoreInfoRequest {};
1458        let resp = self.inner.get_meta_store_info(req).await?;
1459        Ok(resp.meta_store_endpoint)
1460    }
1461
1462    pub async fn alter_sink_props(
1463        &self,
1464        sink_id: SinkId,
1465        changed_props: BTreeMap<String, String>,
1466        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1467        connector_conn_ref: Option<ConnectionId>,
1468    ) -> Result<()> {
1469        let req = AlterConnectorPropsRequest {
1470            object_id: sink_id.as_raw_id(),
1471            changed_props: changed_props.into_iter().collect(),
1472            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1473            connector_conn_ref,
1474            object_type: AlterConnectorPropsObject::Sink as i32,
1475            extra_options: None,
1476        };
1477        let _resp = self.inner.alter_connector_props(req).await?;
1478        Ok(())
1479    }
1480
1481    pub async fn alter_iceberg_table_props(
1482        &self,
1483        table_id: TableId,
1484        sink_id: SinkId,
1485        source_id: SourceId,
1486        changed_props: BTreeMap<String, String>,
1487        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1488        connector_conn_ref: Option<ConnectionId>,
1489    ) -> Result<()> {
1490        let req = AlterConnectorPropsRequest {
1491            object_id: table_id.as_raw_id(),
1492            changed_props: changed_props.into_iter().collect(),
1493            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1494            connector_conn_ref,
1495            object_type: AlterConnectorPropsObject::IcebergTable as i32,
1496            extra_options: Some(ExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds {
1497                sink_id,
1498                source_id,
1499            })),
1500        };
1501        let _resp = self.inner.alter_connector_props(req).await?;
1502        Ok(())
1503    }
1504
1505    pub async fn alter_source_connector_props(
1506        &self,
1507        source_id: SourceId,
1508        changed_props: BTreeMap<String, String>,
1509        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1510        connector_conn_ref: Option<ConnectionId>,
1511    ) -> Result<()> {
1512        let req = AlterConnectorPropsRequest {
1513            object_id: source_id.as_raw_id(),
1514            changed_props: changed_props.into_iter().collect(),
1515            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1516            connector_conn_ref,
1517            object_type: AlterConnectorPropsObject::Source as i32,
1518            extra_options: None,
1519        };
1520        let _resp = self.inner.alter_connector_props(req).await?;
1521        Ok(())
1522    }
1523
1524    pub async fn alter_connection_connector_props(
1525        &self,
1526        connection_id: u32,
1527        changed_props: BTreeMap<String, String>,
1528        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1529    ) -> Result<()> {
1530        let req = AlterConnectorPropsRequest {
1531            object_id: connection_id,
1532            changed_props: changed_props.into_iter().collect(),
1533            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1534            connector_conn_ref: None, // Connections don't reference other connections
1535            object_type: AlterConnectorPropsObject::Connection as i32,
1536            extra_options: None,
1537        };
1538        let _resp = self.inner.alter_connector_props(req).await?;
1539        Ok(())
1540    }
1541
1542    pub async fn set_system_param(
1543        &self,
1544        param: String,
1545        value: Option<String>,
1546    ) -> Result<Option<SystemParamsReader>> {
1547        let req = SetSystemParamRequest { param, value };
1548        let resp = self.inner.set_system_param(req).await?;
1549        Ok(resp.params.map(SystemParamsReader::from))
1550    }
1551
1552    pub async fn get_session_params(&self) -> Result<String> {
1553        let req = GetSessionParamsRequest {};
1554        let resp = self.inner.get_session_params(req).await?;
1555        Ok(resp.params)
1556    }
1557
1558    pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1559        let req = SetSessionParamRequest { param, value };
1560        let resp = self.inner.set_session_param(req).await?;
1561        Ok(resp.param)
1562    }
1563
1564    pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1565        let req = GetDdlProgressRequest {};
1566        let resp = self.inner.get_ddl_progress(req).await?;
1567        Ok(resp.ddl_progress)
1568    }
1569
1570    pub async fn split_compaction_group(
1571        &self,
1572        group_id: CompactionGroupId,
1573        table_ids_to_new_group: &[TableId],
1574        partition_vnode_count: u32,
1575    ) -> Result<CompactionGroupId> {
1576        let req = SplitCompactionGroupRequest {
1577            group_id,
1578            table_ids: table_ids_to_new_group.to_vec(),
1579            partition_vnode_count,
1580        };
1581        let resp = self.inner.split_compaction_group(req).await?;
1582        Ok(resp.new_group_id)
1583    }
1584
1585    pub async fn get_tables(
1586        &self,
1587        table_ids: Vec<TableId>,
1588        include_dropped_tables: bool,
1589    ) -> Result<HashMap<TableId, Table>> {
1590        let req = GetTablesRequest {
1591            table_ids,
1592            include_dropped_tables,
1593        };
1594        let resp = self.inner.get_tables(req).await?;
1595        Ok(resp.tables)
1596    }
1597
1598    pub async fn list_serving_vnode_mappings(
1599        &self,
1600    ) -> Result<HashMap<FragmentId, (u32, WorkerSlotMapping)>> {
1601        let req = GetServingVnodeMappingsRequest {};
1602        let resp = self.inner.get_serving_vnode_mappings(req).await?;
1603        let mappings = resp
1604            .worker_slot_mappings
1605            .into_iter()
1606            .map(|p| {
1607                (
1608                    p.fragment_id,
1609                    (
1610                        resp.fragment_to_table
1611                            .get(&p.fragment_id)
1612                            .cloned()
1613                            .unwrap_or(0),
1614                        WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1615                    ),
1616                )
1617            })
1618            .collect();
1619        Ok(mappings)
1620    }
1621
1622    pub async fn risectl_list_compaction_status(
1623        &self,
1624    ) -> Result<(
1625        Vec<CompactStatus>,
1626        Vec<CompactTaskAssignment>,
1627        Vec<CompactTaskProgress>,
1628    )> {
1629        let req = RiseCtlListCompactionStatusRequest {};
1630        let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1631        Ok((
1632            resp.compaction_statuses,
1633            resp.task_assignment,
1634            resp.task_progress,
1635        ))
1636    }
1637
1638    pub async fn get_compaction_score(
1639        &self,
1640        compaction_group_id: CompactionGroupId,
1641    ) -> Result<Vec<PickerInfo>> {
1642        let req = GetCompactionScoreRequest {
1643            compaction_group_id,
1644        };
1645        let resp = self.inner.get_compaction_score(req).await?;
1646        Ok(resp.scores)
1647    }
1648
1649    pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1650        let req = RiseCtlRebuildTableStatsRequest {};
1651        let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1652        Ok(())
1653    }
1654
1655    pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1656        let req = ListBranchedObjectRequest {};
1657        let resp = self.inner.list_branched_object(req).await?;
1658        Ok(resp.branched_objects)
1659    }
1660
1661    pub async fn list_active_write_limit(&self) -> Result<HashMap<u64, WriteLimit>> {
1662        let req = ListActiveWriteLimitRequest {};
1663        let resp = self.inner.list_active_write_limit(req).await?;
1664        Ok(resp.write_limits)
1665    }
1666
1667    pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1668        let req = ListHummockMetaConfigRequest {};
1669        let resp = self.inner.list_hummock_meta_config(req).await?;
1670        Ok(resp.configs)
1671    }
1672
1673    pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1674        let _resp = self
1675            .inner
1676            .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1677            .await?;
1678
1679        Ok(())
1680    }
1681
1682    pub async fn rw_cloud_validate_source(
1683        &self,
1684        source_type: SourceType,
1685        source_config: HashMap<String, String>,
1686    ) -> Result<RwCloudValidateSourceResponse> {
1687        let req = RwCloudValidateSourceRequest {
1688            source_type: source_type.into(),
1689            source_config,
1690        };
1691        let resp = self.inner.rw_cloud_validate_source(req).await?;
1692        Ok(resp)
1693    }
1694
1695    pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1696        self.inner.core.read().await.sink_coordinate_client.clone()
1697    }
1698
1699    pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1700        let req = ListCompactTaskAssignmentRequest {};
1701        let resp = self.inner.list_compact_task_assignment(req).await?;
1702        Ok(resp.task_assignment)
1703    }
1704
1705    pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1706        let req = ListEventLogRequest::default();
1707        let resp = self.inner.list_event_log(req).await?;
1708        Ok(resp.event_logs)
1709    }
1710
1711    pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1712        let req = ListCompactTaskProgressRequest {};
1713        let resp = self.inner.list_compact_task_progress(req).await?;
1714        Ok(resp.task_progress)
1715    }
1716
1717    #[cfg(madsim)]
1718    pub fn try_add_panic_event_blocking(
1719        &self,
1720        panic_info: impl Display,
1721        timeout_millis: Option<u64>,
1722    ) {
1723    }
1724
1725    /// If `timeout_millis` is None, default is used.
1726    #[cfg(not(madsim))]
1727    pub fn try_add_panic_event_blocking(
1728        &self,
1729        panic_info: impl Display,
1730        timeout_millis: Option<u64>,
1731    ) {
1732        let event = event_log::EventWorkerNodePanic {
1733            worker_id: self.worker_id,
1734            worker_type: self.worker_type.into(),
1735            host_addr: Some(self.host_addr.to_protobuf()),
1736            panic_info: format!("{panic_info}"),
1737        };
1738        let grpc_meta_client = self.inner.clone();
1739        let _ = thread::spawn(move || {
1740            let rt = tokio::runtime::Builder::new_current_thread()
1741                .enable_all()
1742                .build()
1743                .unwrap();
1744            let req = AddEventLogRequest {
1745                event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1746            };
1747            rt.block_on(async {
1748                let _ = tokio::time::timeout(
1749                    Duration::from_millis(timeout_millis.unwrap_or(1000)),
1750                    grpc_meta_client.add_event_log(req),
1751                )
1752                .await;
1753            });
1754        })
1755        .join();
1756    }
1757
1758    pub async fn add_sink_fail_evet(
1759        &self,
1760        sink_id: SinkId,
1761        sink_name: String,
1762        connector: String,
1763        error: String,
1764    ) -> Result<()> {
1765        let event = event_log::EventSinkFail {
1766            sink_id,
1767            sink_name,
1768            connector,
1769            error,
1770        };
1771        let req = AddEventLogRequest {
1772            event: Some(add_event_log_request::Event::SinkFail(event)),
1773        };
1774        self.inner.add_event_log(req).await?;
1775        Ok(())
1776    }
1777
1778    pub async fn add_cdc_auto_schema_change_fail_event(
1779        &self,
1780        source_id: SourceId,
1781        table_name: String,
1782        cdc_table_id: String,
1783        upstream_ddl: String,
1784        fail_info: String,
1785    ) -> Result<()> {
1786        let event = event_log::EventAutoSchemaChangeFail {
1787            table_id: source_id.as_cdc_table_id(),
1788            table_name,
1789            cdc_table_id,
1790            upstream_ddl,
1791            fail_info,
1792        };
1793        let req = AddEventLogRequest {
1794            event: Some(add_event_log_request::Event::AutoSchemaChangeFail(event)),
1795        };
1796        self.inner.add_event_log(req).await?;
1797        Ok(())
1798    }
1799
1800    pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1801        let req = CancelCompactTaskRequest {
1802            task_id,
1803            task_status: task_status as _,
1804        };
1805        let resp = self.inner.cancel_compact_task(req).await?;
1806        Ok(resp.ret)
1807    }
1808
1809    pub async fn get_version_by_epoch(
1810        &self,
1811        epoch: HummockEpoch,
1812        table_id: TableId,
1813    ) -> Result<PbHummockVersion> {
1814        let req = GetVersionByEpochRequest { epoch, table_id };
1815        let resp = self.inner.get_version_by_epoch(req).await?;
1816        Ok(resp.version.unwrap())
1817    }
1818
1819    pub async fn get_cluster_limits(
1820        &self,
1821    ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1822        let req = GetClusterLimitsRequest {};
1823        let resp = self.inner.get_cluster_limits(req).await?;
1824        Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1825    }
1826
1827    pub async fn merge_compaction_group(
1828        &self,
1829        left_group_id: CompactionGroupId,
1830        right_group_id: CompactionGroupId,
1831    ) -> Result<()> {
1832        let req = MergeCompactionGroupRequest {
1833            left_group_id,
1834            right_group_id,
1835        };
1836        self.inner.merge_compaction_group(req).await?;
1837        Ok(())
1838    }
1839
1840    /// List all rate limits for sources and backfills
1841    pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1842        let request = ListRateLimitsRequest {};
1843        let resp = self.inner.list_rate_limits(request).await?;
1844        Ok(resp.rate_limits)
1845    }
1846
1847    pub async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
1848        let request = ListCdcProgressRequest {};
1849        let resp = self.inner.list_cdc_progress(request).await?;
1850        Ok(resp.cdc_progress)
1851    }
1852
1853    pub async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>> {
1854        let request = ListRefreshTableStatesRequest {};
1855        let resp = self.inner.list_refresh_table_states(request).await?;
1856        Ok(resp.states)
1857    }
1858
1859    pub async fn create_iceberg_table(
1860        &self,
1861        table_job_info: PbTableJobInfo,
1862        sink_job_info: PbSinkJobInfo,
1863        iceberg_source: PbSource,
1864        if_not_exists: bool,
1865    ) -> Result<WaitVersion> {
1866        let request = CreateIcebergTableRequest {
1867            table_info: Some(table_job_info),
1868            sink_info: Some(sink_job_info),
1869            iceberg_source: Some(iceberg_source),
1870            if_not_exists,
1871        };
1872
1873        let resp = self.inner.create_iceberg_table(request).await?;
1874        Ok(resp
1875            .version
1876            .ok_or_else(|| anyhow!("wait version not set"))?)
1877    }
1878
1879    pub async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
1880        let request = ListIcebergTablesRequest {};
1881        let resp = self.inner.list_iceberg_tables(request).await?;
1882        Ok(resp.iceberg_tables)
1883    }
1884
1885    /// Get the await tree of all nodes in the cluster.
1886    pub async fn get_cluster_stack_trace(
1887        &self,
1888        actor_traces_format: ActorTracesFormat,
1889    ) -> Result<StackTraceResponse> {
1890        let request = StackTraceRequest {
1891            actor_traces_format: actor_traces_format as i32,
1892        };
1893        let resp = self.inner.stack_trace(request).await?;
1894        Ok(resp)
1895    }
1896
1897    pub async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
1898        let request = SetSyncLogStoreAlignedRequest { job_id, aligned };
1899        self.inner.set_sync_log_store_aligned(request).await?;
1900        Ok(())
1901    }
1902
1903    pub async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
1904        self.inner.refresh(request).await
1905    }
1906
1907    pub async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
1908        let request = ListUnmigratedTablesRequest {};
1909        let resp = self.inner.list_unmigrated_tables(request).await?;
1910        Ok(resp
1911            .tables
1912            .into_iter()
1913            .map(|table| (table.table_id, table.table_name))
1914            .collect())
1915    }
1916}
1917
1918#[async_trait]
1919impl HummockMetaClient for MetaClient {
1920    async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
1921        let req = UnpinVersionBeforeRequest {
1922            context_id: self.worker_id(),
1923            unpin_version_before: unpin_version_before.to_u64(),
1924        };
1925        self.inner.unpin_version_before(req).await?;
1926        Ok(())
1927    }
1928
1929    async fn get_current_version(&self) -> Result<HummockVersion> {
1930        let req = GetCurrentVersionRequest::default();
1931        Ok(HummockVersion::from_rpc_protobuf(
1932            &self
1933                .inner
1934                .get_current_version(req)
1935                .await?
1936                .current_version
1937                .unwrap(),
1938        ))
1939    }
1940
1941    async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
1942        let resp = self
1943            .inner
1944            .get_new_object_ids(GetNewObjectIdsRequest { number })
1945            .await?;
1946        Ok(ObjectIdRange::new(resp.start_id, resp.end_id))
1947    }
1948
1949    async fn commit_epoch_with_change_log(
1950        &self,
1951        _epoch: HummockEpoch,
1952        _sync_result: SyncResult,
1953        _change_log_info: Option<HummockMetaClientChangeLogInfo>,
1954    ) -> Result<()> {
1955        panic!("Only meta service can commit_epoch in production.")
1956    }
1957
1958    async fn trigger_manual_compaction(
1959        &self,
1960        compaction_group_id: u64,
1961        table_id: JobId,
1962        level: u32,
1963        sst_ids: Vec<u64>,
1964    ) -> Result<()> {
1965        // TODO: support key_range parameter
1966        let req = TriggerManualCompactionRequest {
1967            compaction_group_id,
1968            table_id,
1969            // if table_id not exist, manual_compaction will include all the sst
1970            // without check internal_table_id
1971            level,
1972            sst_ids,
1973            ..Default::default()
1974        };
1975
1976        self.inner.trigger_manual_compaction(req).await?;
1977        Ok(())
1978    }
1979
1980    async fn trigger_full_gc(
1981        &self,
1982        sst_retention_time_sec: u64,
1983        prefix: Option<String>,
1984    ) -> Result<()> {
1985        self.inner
1986            .trigger_full_gc(TriggerFullGcRequest {
1987                sst_retention_time_sec,
1988                prefix,
1989            })
1990            .await?;
1991        Ok(())
1992    }
1993
1994    async fn subscribe_compaction_event(
1995        &self,
1996    ) -> Result<(
1997        UnboundedSender<SubscribeCompactionEventRequest>,
1998        BoxStream<'static, CompactionEventItem>,
1999    )> {
2000        let (request_sender, request_receiver) =
2001            unbounded_channel::<SubscribeCompactionEventRequest>();
2002        request_sender
2003            .send(SubscribeCompactionEventRequest {
2004                event: Some(subscribe_compaction_event_request::Event::Register(
2005                    Register {
2006                        context_id: self.worker_id(),
2007                    },
2008                )),
2009                create_at: SystemTime::now()
2010                    .duration_since(SystemTime::UNIX_EPOCH)
2011                    .expect("Clock may have gone backwards")
2012                    .as_millis() as u64,
2013            })
2014            .context("Failed to subscribe compaction event")?;
2015
2016        let stream = self
2017            .inner
2018            .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
2019                request_receiver,
2020            )))
2021            .await?;
2022
2023        Ok((request_sender, Box::pin(stream)))
2024    }
2025
2026    async fn get_version_by_epoch(
2027        &self,
2028        epoch: HummockEpoch,
2029        table_id: TableId,
2030    ) -> Result<PbHummockVersion> {
2031        self.get_version_by_epoch(epoch, table_id).await
2032    }
2033
2034    async fn subscribe_iceberg_compaction_event(
2035        &self,
2036    ) -> Result<(
2037        UnboundedSender<SubscribeIcebergCompactionEventRequest>,
2038        BoxStream<'static, IcebergCompactionEventItem>,
2039    )> {
2040        let (request_sender, request_receiver) =
2041            unbounded_channel::<SubscribeIcebergCompactionEventRequest>();
2042        request_sender
2043            .send(SubscribeIcebergCompactionEventRequest {
2044                event: Some(subscribe_iceberg_compaction_event_request::Event::Register(
2045                    IcebergRegister {
2046                        context_id: self.worker_id(),
2047                    },
2048                )),
2049                create_at: SystemTime::now()
2050                    .duration_since(SystemTime::UNIX_EPOCH)
2051                    .expect("Clock may have gone backwards")
2052                    .as_millis() as u64,
2053            })
2054            .context("Failed to subscribe compaction event")?;
2055
2056        let stream = self
2057            .inner
2058            .subscribe_iceberg_compaction_event(Request::new(UnboundedReceiverStream::new(
2059                request_receiver,
2060            )))
2061            .await?;
2062
2063        Ok((request_sender, Box::pin(stream)))
2064    }
2065}
2066
2067#[async_trait]
2068impl TelemetryInfoFetcher for MetaClient {
2069    async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
2070        let resp = self
2071            .get_telemetry_info()
2072            .await
2073            .map_err(|e| e.to_report_string())?;
2074        let tracking_id = resp.get_tracking_id().ok();
2075        Ok(tracking_id.map(|id| id.to_owned()))
2076    }
2077}
2078
2079pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
2080
2081#[derive(Debug, Clone)]
2082struct GrpcMetaClientCore {
2083    cluster_client: ClusterServiceClient<Channel>,
2084    meta_member_client: MetaMemberServiceClient<Channel>,
2085    heartbeat_client: HeartbeatServiceClient<Channel>,
2086    ddl_client: DdlServiceClient<Channel>,
2087    hummock_client: HummockManagerServiceClient<Channel>,
2088    notification_client: NotificationServiceClient<Channel>,
2089    stream_client: StreamManagerServiceClient<Channel>,
2090    user_client: UserServiceClient<Channel>,
2091    scale_client: ScaleServiceClient<Channel>,
2092    backup_client: BackupServiceClient<Channel>,
2093    telemetry_client: TelemetryInfoServiceClient<Channel>,
2094    system_params_client: SystemParamsServiceClient<Channel>,
2095    session_params_client: SessionParamServiceClient<Channel>,
2096    serving_client: ServingServiceClient<Channel>,
2097    cloud_client: CloudServiceClient<Channel>,
2098    sink_coordinate_client: SinkCoordinationRpcClient,
2099    event_log_client: EventLogServiceClient<Channel>,
2100    cluster_limit_client: ClusterLimitServiceClient<Channel>,
2101    hosted_iceberg_catalog_service_client: HostedIcebergCatalogServiceClient<Channel>,
2102    monitor_client: MonitorServiceClient<Channel>,
2103}
2104
2105impl GrpcMetaClientCore {
2106    pub(crate) fn new(channel: Channel) -> Self {
2107        let cluster_client = ClusterServiceClient::new(channel.clone());
2108        let meta_member_client = MetaMemberClient::new(channel.clone());
2109        let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
2110        let ddl_client =
2111            DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2112        let hummock_client =
2113            HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2114        let notification_client =
2115            NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2116        let stream_client =
2117            StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2118        let user_client = UserServiceClient::new(channel.clone());
2119        let scale_client =
2120            ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2121        let backup_client = BackupServiceClient::new(channel.clone());
2122        let telemetry_client =
2123            TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2124        let system_params_client = SystemParamsServiceClient::new(channel.clone());
2125        let session_params_client = SessionParamServiceClient::new(channel.clone());
2126        let serving_client = ServingServiceClient::new(channel.clone());
2127        let cloud_client = CloudServiceClient::new(channel.clone());
2128        let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone())
2129            .max_decoding_message_size(usize::MAX);
2130        let event_log_client = EventLogServiceClient::new(channel.clone());
2131        let cluster_limit_client = ClusterLimitServiceClient::new(channel.clone());
2132        let hosted_iceberg_catalog_service_client =
2133            HostedIcebergCatalogServiceClient::new(channel.clone());
2134        let monitor_client = MonitorServiceClient::new(channel);
2135
2136        GrpcMetaClientCore {
2137            cluster_client,
2138            meta_member_client,
2139            heartbeat_client,
2140            ddl_client,
2141            hummock_client,
2142            notification_client,
2143            stream_client,
2144            user_client,
2145            scale_client,
2146            backup_client,
2147            telemetry_client,
2148            system_params_client,
2149            session_params_client,
2150            serving_client,
2151            cloud_client,
2152            sink_coordinate_client,
2153            event_log_client,
2154            cluster_limit_client,
2155            hosted_iceberg_catalog_service_client,
2156            monitor_client,
2157        }
2158    }
2159}
2160
2161/// Client to meta server. Cloning the instance is lightweight.
2162///
2163/// It is a wrapper of tonic client. See [`crate::meta_rpc_client_method_impl`].
2164#[derive(Debug, Clone)]
2165struct GrpcMetaClient {
2166    member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
2167    core: Arc<RwLock<GrpcMetaClientCore>>,
2168}
2169
2170type MetaMemberClient = MetaMemberServiceClient<Channel>;
2171
2172struct MetaMemberGroup {
2173    members: LruCache<http::Uri, Option<MetaMemberClient>>,
2174}
2175
2176struct MetaMemberManagement {
2177    core_ref: Arc<RwLock<GrpcMetaClientCore>>,
2178    members: Either<MetaMemberClient, MetaMemberGroup>,
2179    current_leader: http::Uri,
2180    meta_config: Arc<MetaConfig>,
2181}
2182
2183impl MetaMemberManagement {
2184    const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
2185
2186    fn host_address_to_uri(addr: HostAddress) -> http::Uri {
2187        format!("http://{}:{}", addr.host, addr.port)
2188            .parse()
2189            .unwrap()
2190    }
2191
2192    async fn recreate_core(&self, channel: Channel) {
2193        let mut core = self.core_ref.write().await;
2194        *core = GrpcMetaClientCore::new(channel);
2195    }
2196
2197    async fn refresh_members(&mut self) -> Result<()> {
2198        let leader_addr = match self.members.as_mut() {
2199            Either::Left(client) => {
2200                let resp = client
2201                    .to_owned()
2202                    .members(MembersRequest {})
2203                    .await
2204                    .map_err(RpcError::from_meta_status)?;
2205                let resp = resp.into_inner();
2206                resp.members.into_iter().find(|member| member.is_leader)
2207            }
2208            Either::Right(member_group) => {
2209                let mut fetched_members = None;
2210
2211                for (addr, client) in &mut member_group.members {
2212                    let members: Result<_> = try {
2213                        let mut client = match client {
2214                            Some(cached_client) => cached_client.to_owned(),
2215                            None => {
2216                                let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
2217                                let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
2218                                    .await
2219                                    .context("failed to create client")?;
2220                                let new_client: MetaMemberClient =
2221                                    MetaMemberServiceClient::new(channel);
2222                                *client = Some(new_client.clone());
2223
2224                                new_client
2225                            }
2226                        };
2227
2228                        let resp = client
2229                            .members(MembersRequest {})
2230                            .await
2231                            .context("failed to fetch members")?;
2232
2233                        resp.into_inner().members
2234                    };
2235
2236                    let fetched = members.is_ok();
2237                    fetched_members = Some(members);
2238                    if fetched {
2239                        break;
2240                    }
2241                }
2242
2243                let members = fetched_members
2244                    .context("no member available in the list")?
2245                    .context("could not refresh members")?;
2246
2247                // find new leader
2248                let mut leader = None;
2249                for member in members {
2250                    if member.is_leader {
2251                        leader = Some(member.clone());
2252                    }
2253
2254                    let addr = Self::host_address_to_uri(member.address.unwrap());
2255                    // We don't clean any expired addrs here to deal with some extreme situations.
2256                    if !member_group.members.contains(&addr) {
2257                        tracing::info!("new meta member joined: {}", addr);
2258                        member_group.members.put(addr, None);
2259                    }
2260                }
2261
2262                leader
2263            }
2264        };
2265
2266        if let Some(leader) = leader_addr {
2267            let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
2268
2269            if discovered_leader != self.current_leader {
2270                tracing::info!("new meta leader {} discovered", discovered_leader);
2271
2272                let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
2273                    Duration::from_secs(self.meta_config.meta_leader_lease_secs),
2274                    false,
2275                );
2276
2277                let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
2278                    let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
2279                    GrpcMetaClient::connect_to_endpoint(endpoint).await
2280                })
2281                .await?;
2282
2283                self.recreate_core(channel).await;
2284                self.current_leader = discovered_leader;
2285            }
2286        }
2287
2288        Ok(())
2289    }
2290}
2291
2292impl GrpcMetaClient {
2293    // See `Endpoint::http2_keep_alive_interval`
2294    const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
2295    // See `Endpoint::keep_alive_timeout`
2296    const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
2297    // Retry base interval in ms for connecting to meta server.
2298    const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
2299    // Max retry times for connecting to meta server.
2300    const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
2301
2302    fn start_meta_member_monitor(
2303        &self,
2304        init_leader_addr: http::Uri,
2305        members: Either<MetaMemberClient, MetaMemberGroup>,
2306        force_refresh_receiver: Receiver<Sender<Result<()>>>,
2307        meta_config: Arc<MetaConfig>,
2308    ) -> Result<()> {
2309        let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
2310        let current_leader = init_leader_addr;
2311
2312        let enable_period_tick = matches!(members, Either::Right(_));
2313
2314        let member_management = MetaMemberManagement {
2315            core_ref,
2316            members,
2317            current_leader,
2318            meta_config,
2319        };
2320
2321        let mut force_refresh_receiver = force_refresh_receiver;
2322
2323        tokio::spawn(async move {
2324            let mut member_management = member_management;
2325            let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
2326
2327            loop {
2328                let event: Option<Sender<Result<()>>> = if enable_period_tick {
2329                    tokio::select! {
2330                        _ = ticker.tick() => None,
2331                        result_sender = force_refresh_receiver.recv() => {
2332                            if result_sender.is_none() {
2333                                break;
2334                            }
2335
2336                            result_sender
2337                        },
2338                    }
2339                } else {
2340                    let result_sender = force_refresh_receiver.recv().await;
2341
2342                    if result_sender.is_none() {
2343                        break;
2344                    }
2345
2346                    result_sender
2347                };
2348
2349                let tick_result = member_management.refresh_members().await;
2350                if let Err(e) = tick_result.as_ref() {
2351                    tracing::warn!(error = %e.as_report(),  "refresh meta member client failed");
2352                }
2353
2354                if let Some(sender) = event {
2355                    // ignore resp
2356                    let _resp = sender.send(tick_result);
2357                }
2358            }
2359        });
2360
2361        Ok(())
2362    }
2363
2364    async fn force_refresh_leader(&self) -> Result<()> {
2365        let (sender, receiver) = oneshot::channel();
2366
2367        self.member_monitor_event_sender
2368            .send(sender)
2369            .await
2370            .map_err(|e| anyhow!(e))?;
2371
2372        receiver.await.map_err(|e| anyhow!(e))?
2373    }
2374
2375    /// Connect to the meta server from `addrs`.
2376    pub async fn new(strategy: &MetaAddressStrategy, config: Arc<MetaConfig>) -> Result<Self> {
2377        let (channel, addr) = match strategy {
2378            MetaAddressStrategy::LoadBalance(addr) => {
2379                Self::try_build_rpc_channel(vec![addr.clone()]).await
2380            }
2381            MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
2382        }?;
2383        let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
2384        let client = GrpcMetaClient {
2385            member_monitor_event_sender: force_refresh_sender,
2386            core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
2387        };
2388
2389        let meta_member_client = client.core.read().await.meta_member_client.clone();
2390        let members = match strategy {
2391            MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
2392            MetaAddressStrategy::List(addrs) => {
2393                let mut members = LruCache::new(20.try_into().unwrap());
2394                for addr in addrs {
2395                    members.put(addr.clone(), None);
2396                }
2397                members.put(addr.clone(), Some(meta_member_client));
2398
2399                Either::Right(MetaMemberGroup { members })
2400            }
2401        };
2402
2403        client.start_meta_member_monitor(addr, members, force_refresh_receiver, config.clone())?;
2404
2405        client.force_refresh_leader().await?;
2406
2407        Ok(client)
2408    }
2409
2410    fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2411        Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2412    }
2413
2414    pub(crate) async fn try_build_rpc_channel(
2415        addrs: impl IntoIterator<Item = http::Uri>,
2416    ) -> Result<(Channel, http::Uri)> {
2417        let endpoints: Vec<_> = addrs
2418            .into_iter()
2419            .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2420            .collect();
2421
2422        let mut last_error = None;
2423
2424        for (endpoint, addr) in endpoints {
2425            match Self::connect_to_endpoint(endpoint).await {
2426                Ok(channel) => {
2427                    tracing::info!("Connect to meta server {} successfully", addr);
2428                    return Ok((channel, addr));
2429                }
2430                Err(e) => {
2431                    tracing::warn!(
2432                        error = %e.as_report(),
2433                        "Failed to connect to meta server {}, trying again",
2434                        addr,
2435                    );
2436                    last_error = Some(e);
2437                }
2438            }
2439        }
2440
2441        if let Some(last_error) = last_error {
2442            Err(anyhow::anyhow!(last_error)
2443                .context("failed to connect to all meta servers")
2444                .into())
2445        } else {
2446            bail!("no meta server address provided")
2447        }
2448    }
2449
2450    async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2451        let channel = endpoint
2452            .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2453            .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2454            .connect_timeout(Duration::from_secs(5))
2455            .monitored_connect("grpc-meta-client", Default::default())
2456            .await?
2457            .wrapped();
2458
2459        Ok(channel)
2460    }
2461
2462    pub(crate) fn retry_strategy_to_bound(
2463        high_bound: Duration,
2464        exceed: bool,
2465    ) -> impl Iterator<Item = Duration> {
2466        let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2467            .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2468            .map(jitter);
2469
2470        let mut sum = Duration::default();
2471
2472        iter.take_while(move |duration| {
2473            sum += *duration;
2474
2475            if exceed {
2476                sum < high_bound + *duration
2477            } else {
2478                sum < high_bound
2479            }
2480        })
2481    }
2482}
2483
2484macro_rules! for_all_meta_rpc {
2485    ($macro:ident) => {
2486        $macro! {
2487             { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2488            ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2489            ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2490            ,{ cluster_client, update_worker_node_schedulability, UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse }
2491            ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2492            ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2493            ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2494            ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2495            ,{ stream_client, flush, FlushRequest, FlushResponse }
2496            ,{ stream_client, pause, PauseRequest, PauseResponse }
2497            ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2498            ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2499            ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2500            ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2501            ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2502            ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2503            ,{ stream_client, list_creating_fragment_distribution, ListCreatingFragmentDistributionRequest, ListCreatingFragmentDistributionResponse }
2504            ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2505            ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2506            ,{ stream_client, list_object_dependencies, ListObjectDependenciesRequest, ListObjectDependenciesResponse }
2507            ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2508            ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2509            ,{ stream_client, list_cdc_progress, ListCdcProgressRequest, ListCdcProgressResponse }
2510            ,{ stream_client, list_refresh_table_states, ListRefreshTableStatesRequest, ListRefreshTableStatesResponse }
2511            ,{ stream_client, alter_connector_props, AlterConnectorPropsRequest, AlterConnectorPropsResponse }
2512            ,{ stream_client, get_fragment_by_id, GetFragmentByIdRequest, GetFragmentByIdResponse }
2513            ,{ stream_client, get_fragment_vnodes, GetFragmentVnodesRequest, GetFragmentVnodesResponse }
2514            ,{ stream_client, get_actor_vnodes, GetActorVnodesRequest, GetActorVnodesResponse }
2515            ,{ stream_client, set_sync_log_store_aligned, SetSyncLogStoreAlignedRequest, SetSyncLogStoreAlignedResponse }
2516            ,{ stream_client, refresh, RefreshRequest, RefreshResponse }
2517            ,{ stream_client, list_unmigrated_tables, ListUnmigratedTablesRequest, ListUnmigratedTablesResponse }
2518            ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2519            ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2520            ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2521            ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2522            ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2523            ,{ ddl_client, alter_streaming_job_config, AlterStreamingJobConfigRequest, AlterStreamingJobConfigResponse }
2524            ,{ ddl_client, alter_fragment_parallelism, AlterFragmentParallelismRequest, AlterFragmentParallelismResponse }
2525            ,{ ddl_client, alter_cdc_table_backfill_parallelism, AlterCdcTableBackfillParallelismRequest, AlterCdcTableBackfillParallelismResponse }
2526            ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2527            ,{ ddl_client, alter_database_param, AlterDatabaseParamRequest, AlterDatabaseParamResponse }
2528            ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2529            ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2530            ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2531            ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2532            ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2533            ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2534            ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2535            ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2536            ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2537            ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2538            ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2539            ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2540            ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2541            ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2542            ,{ ddl_client, reset_source, ResetSourceRequest, ResetSourceResponse }
2543            ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2544            ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2545            ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2546            ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2547            ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2548            ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2549            ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2550            ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2551            ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2552            ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2553            ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2554            ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2555            ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2556            ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2557            ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2558            ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2559            ,{ ddl_client, wait, WaitRequest, WaitResponse }
2560            ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2561            ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2562            ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2563            ,{ ddl_client, compact_iceberg_table, CompactIcebergTableRequest, CompactIcebergTableResponse }
2564            ,{ ddl_client, expire_iceberg_table_snapshots, ExpireIcebergTableSnapshotsRequest, ExpireIcebergTableSnapshotsResponse }
2565            ,{ ddl_client, create_iceberg_table, CreateIcebergTableRequest, CreateIcebergTableResponse }
2566            ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2567            ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2568            ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2569            ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2570            ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2571            ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2572            ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2573            ,{ hummock_client, get_new_object_ids, GetNewObjectIdsRequest, GetNewObjectIdsResponse }
2574            ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2575            ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2576            ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2577            ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2578            ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2579            ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2580            ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2581            ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2582            ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2583            ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2584            ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2585            ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2586            ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2587            ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2588            ,{ hummock_client, subscribe_iceberg_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeIcebergCompactionEventRequest>, Streaming<SubscribeIcebergCompactionEventResponse> }
2589            ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2590            ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2591            ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2592            ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2593            ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2594            ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2595            ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2596            ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2597            ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2598            ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2599            ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2600            ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2601            ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2602            ,{ user_client, alter_default_privilege, AlterDefaultPrivilegeRequest, AlterDefaultPrivilegeResponse }
2603            ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2604            ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2605            ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2606            ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2607            ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2608            ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2609            ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2610            ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2611            ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2612            ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2613            ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2614            ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2615            ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2616            ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2617            ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2618            ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2619            ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2620            ,{ hosted_iceberg_catalog_service_client, list_iceberg_tables, ListIcebergTablesRequest, ListIcebergTablesResponse }
2621            ,{ monitor_client, stack_trace, StackTraceRequest, StackTraceResponse }
2622        }
2623    };
2624}
2625
2626impl GrpcMetaClient {
2627    async fn refresh_client_if_needed(&self, code: Code) {
2628        if matches!(
2629            code,
2630            Code::Unknown | Code::Unimplemented | Code::Unavailable
2631        ) {
2632            tracing::debug!("matching tonic code {}", code);
2633            let (result_sender, result_receiver) = oneshot::channel();
2634            if self
2635                .member_monitor_event_sender
2636                .try_send(result_sender)
2637                .is_ok()
2638            {
2639                if let Ok(Err(e)) = result_receiver.await {
2640                    tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2641                }
2642            } else {
2643                tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2644            }
2645        }
2646    }
2647}
2648
2649impl GrpcMetaClient {
2650    for_all_meta_rpc! { meta_rpc_client_method_impl }
2651}