risingwave_rpc_client/
meta_client.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{
33    AlterDatabaseParam, FunctionId, IndexId, ObjectId, SecretId, TableId,
34};
35use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
36use risingwave_common::hash::WorkerSlotMapping;
37use risingwave_common::id::{
38    ConnectionId, DatabaseId, JobId, SchemaId, SinkId, SubscriptionId, UserId, ViewId, WorkerId,
39};
40use risingwave_common::monitor::EndpointExt;
41use risingwave_common::system_param::reader::SystemParamsReader;
42use risingwave_common::telemetry::report::TelemetryInfoFetcher;
43use risingwave_common::util::addr::HostAddr;
44use risingwave_common::util::meta_addr::MetaAddressStrategy;
45use risingwave_common::util::resource_util::cpu::total_cpu_available;
46use risingwave_common::util::resource_util::hostname;
47use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
48use risingwave_error::bail;
49use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
50use risingwave_hummock_sdk::change_log::{TableChangeLog, TableChangeLogs};
51use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
52use risingwave_hummock_sdk::{
53    CompactionGroupId, HummockEpoch, HummockVersionId, ObjectIdRange, SyncResult,
54};
55use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
56use risingwave_pb::backup_service::*;
57use risingwave_pb::catalog::{
58    Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
59    PbSubscription, PbTable, PbView, Table,
60};
61use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
62use risingwave_pb::cloud_service::*;
63use risingwave_pb::common::worker_node::Property;
64use risingwave_pb::common::{HostAddress, OptionalUint32, OptionalUint64, WorkerNode, WorkerType};
65use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
66use risingwave_pb::ddl_service::alter_owner_request::Object;
67use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
68use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
69use risingwave_pb::ddl_service::*;
70use risingwave_pb::hummock::compact_task::TaskStatus;
71use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
72use risingwave_pb::hummock::get_table_change_logs_request::PbTableFilter;
73use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
74use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
75use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
76use risingwave_pb::hummock::write_limits::WriteLimit;
77use risingwave_pb::hummock::*;
78use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Register as IcebergRegister;
79use risingwave_pb::iceberg_compaction::{
80    SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
81    subscribe_iceberg_compaction_event_request,
82};
83use risingwave_pb::id::{ActorId, FragmentId, HummockSstableId, SourceId};
84use risingwave_pb::meta::alter_connector_props_request::{
85    AlterConnectorPropsObject, AlterIcebergTableIds, ExtraOptions,
86};
87use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
88use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
89use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
90use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
91use risingwave_pb::meta::hosted_iceberg_catalog_service_client::HostedIcebergCatalogServiceClient;
92use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
93use risingwave_pb::meta::list_actor_states_response::ActorState;
94use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
95use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
96use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
97use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
98use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
99use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
100use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
101use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
102use risingwave_pb::meta::serving_service_client::ServingServiceClient;
103use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
104use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
105use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
106use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
107use risingwave_pb::meta::{FragmentDistribution, *};
108use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
109use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
110use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
111use risingwave_pb::secret::PbSecretRef;
112use risingwave_pb::stream_plan::StreamFragmentGraph;
113use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
114use risingwave_pb::user::update_user_request::UpdateField;
115use risingwave_pb::user::user_service_client::UserServiceClient;
116use risingwave_pb::user::*;
117use thiserror_ext::AsReport;
118use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
119use tokio::sync::oneshot::Sender;
120use tokio::sync::{RwLock, mpsc, oneshot};
121use tokio::task::JoinHandle;
122use tokio::time::{self};
123use tokio_retry::strategy::{ExponentialBackoff, jitter};
124use tokio_stream::wrappers::UnboundedReceiverStream;
125use tonic::transport::Endpoint;
126use tonic::{Code, Request, Streaming};
127
128use crate::channel::{Channel, WrappedChannelExt};
129use crate::error::{Result, RpcError};
130use crate::hummock_meta_client::{
131    CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
132    IcebergCompactionEventItem,
133};
134use crate::meta_rpc_client_method_impl;
135
136/// Client to meta server. Cloning the instance is lightweight.
137#[derive(Clone, Debug)]
138pub struct MetaClient {
139    worker_id: WorkerId,
140    worker_type: WorkerType,
141    host_addr: HostAddr,
142    inner: GrpcMetaClient,
143    meta_config: Arc<MetaConfig>,
144    cluster_id: String,
145    shutting_down: Arc<AtomicBool>,
146}
147
148impl MetaClient {
149    pub fn worker_id(&self) -> WorkerId {
150        self.worker_id
151    }
152
153    pub fn host_addr(&self) -> &HostAddr {
154        &self.host_addr
155    }
156
157    pub fn worker_type(&self) -> WorkerType {
158        self.worker_type
159    }
160
161    pub fn cluster_id(&self) -> &str {
162        &self.cluster_id
163    }
164
165    /// Subscribe to notification from meta.
166    pub async fn subscribe(
167        &self,
168        subscribe_type: SubscribeType,
169    ) -> Result<Streaming<SubscribeResponse>> {
170        let request = SubscribeRequest {
171            subscribe_type: subscribe_type as i32,
172            host: Some(self.host_addr.to_protobuf()),
173            worker_id: self.worker_id(),
174        };
175
176        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
177            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
178            true,
179        );
180
181        tokio_retry::Retry::spawn(retry_strategy, || async {
182            let request = request.clone();
183            self.inner.subscribe(request).await
184        })
185        .await
186    }
187
188    pub async fn create_connection(
189        &self,
190        connection_name: String,
191        database_id: DatabaseId,
192        schema_id: SchemaId,
193        owner_id: UserId,
194        req: create_connection_request::Payload,
195    ) -> Result<WaitVersion> {
196        let request = CreateConnectionRequest {
197            name: connection_name,
198            database_id,
199            schema_id,
200            owner_id,
201            payload: Some(req),
202        };
203        let resp = self.inner.create_connection(request).await?;
204        Ok(resp
205            .version
206            .ok_or_else(|| anyhow!("wait version not set"))?)
207    }
208
209    pub async fn create_secret(
210        &self,
211        secret_name: String,
212        database_id: DatabaseId,
213        schema_id: SchemaId,
214        owner_id: UserId,
215        value: Vec<u8>,
216    ) -> Result<WaitVersion> {
217        let request = CreateSecretRequest {
218            name: secret_name,
219            database_id,
220            schema_id,
221            owner_id,
222            value,
223        };
224        let resp = self.inner.create_secret(request).await?;
225        Ok(resp
226            .version
227            .ok_or_else(|| anyhow!("wait version not set"))?)
228    }
229
230    pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
231        let request = ListConnectionsRequest {};
232        let resp = self.inner.list_connections(request).await?;
233        Ok(resp.connections)
234    }
235
236    pub async fn drop_connection(
237        &self,
238        connection_id: ConnectionId,
239        cascade: bool,
240    ) -> Result<WaitVersion> {
241        let request = DropConnectionRequest {
242            connection_id,
243            cascade,
244        };
245        let resp = self.inner.drop_connection(request).await?;
246        Ok(resp
247            .version
248            .ok_or_else(|| anyhow!("wait version not set"))?)
249    }
250
251    pub async fn drop_secret(&self, secret_id: SecretId, cascade: bool) -> Result<WaitVersion> {
252        let request = DropSecretRequest { secret_id, cascade };
253        let resp = self.inner.drop_secret(request).await?;
254        Ok(resp
255            .version
256            .ok_or_else(|| anyhow!("wait version not set"))?)
257    }
258
259    /// Register the current node to the cluster and set the corresponding worker id.
260    ///
261    /// Retry if there's connection issue with the meta node. Exit the process if the registration fails.
262    pub async fn register_new(
263        addr_strategy: MetaAddressStrategy,
264        worker_type: WorkerType,
265        addr: &HostAddr,
266        property: Property,
267        meta_config: Arc<MetaConfig>,
268    ) -> (Self, SystemParamsReader) {
269        let ret =
270            Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
271
272        match ret {
273            Ok(ret) => ret,
274            Err(err) => {
275                tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
276                std::process::exit(1);
277            }
278        }
279    }
280
281    async fn register_new_inner(
282        addr_strategy: MetaAddressStrategy,
283        worker_type: WorkerType,
284        addr: &HostAddr,
285        property: Property,
286        meta_config: Arc<MetaConfig>,
287    ) -> Result<(Self, SystemParamsReader)> {
288        tracing::info!("register meta client using strategy: {}", addr_strategy);
289
290        // Retry until reaching `max_heartbeat_interval_secs`
291        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
292            Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
293            true,
294        );
295
296        let init_result: Result<_> = tokio_retry::RetryIf::spawn(
297            retry_strategy,
298            || async {
299                let grpc_meta_client =
300                    GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
301
302                let add_worker_resp = grpc_meta_client
303                    .add_worker_node(AddWorkerNodeRequest {
304                        worker_type: worker_type as i32,
305                        host: Some(addr.to_protobuf()),
306                        property: Some(property.clone()),
307                        resource: Some(risingwave_pb::common::worker_node::Resource {
308                            rw_version: RW_VERSION.to_owned(),
309                            total_memory_bytes: system_memory_available_bytes() as _,
310                            total_cpu_cores: total_cpu_available() as _,
311                            hostname: hostname(),
312                        }),
313                    })
314                    .await
315                    .context("failed to add worker node")?;
316
317                let system_params_resp = grpc_meta_client
318                    .get_system_params(GetSystemParamsRequest {})
319                    .await
320                    .context("failed to get initial system params")?;
321
322                Ok((add_worker_resp, system_params_resp, grpc_meta_client))
323            },
324            // Only retry if there's any transient connection issue.
325            // If the error is from our implementation or business, do not retry it.
326            |e: &RpcError| !e.is_from_tonic_server_impl(),
327        )
328        .await;
329
330        let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
331        let worker_id = add_worker_resp
332            .node_id
333            .expect("AddWorkerNodeResponse::node_id is empty");
334
335        let meta_client = Self {
336            worker_id,
337            worker_type,
338            host_addr: addr.clone(),
339            inner: grpc_meta_client,
340            meta_config: meta_config.clone(),
341            cluster_id: add_worker_resp.cluster_id,
342            shutting_down: Arc::new(false.into()),
343        };
344
345        static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
346        REPORT_PANIC.call_once(|| {
347            let meta_client_clone = meta_client.clone();
348            std::panic::update_hook(move |default_hook, info| {
349                // Try to report panic event to meta node.
350                meta_client_clone.try_add_panic_event_blocking(info, None);
351                default_hook(info);
352            });
353        });
354
355        Ok((meta_client, system_params_resp.params.unwrap().into()))
356    }
357
358    /// Activate the current node in cluster to confirm it's ready to serve.
359    pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
360        let request = ActivateWorkerNodeRequest {
361            host: Some(addr.to_protobuf()),
362            node_id: self.worker_id,
363        };
364        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
365            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
366            true,
367        );
368        tokio_retry::Retry::spawn(retry_strategy, || async {
369            let request = request.clone();
370            self.inner.activate_worker_node(request).await
371        })
372        .await?;
373
374        Ok(())
375    }
376
377    /// Send heartbeat signal to meta service.
378    pub async fn send_heartbeat(&self) -> Result<()> {
379        let request = HeartbeatRequest {
380            node_id: self.worker_id,
381            resource: Some(risingwave_pb::common::worker_node::Resource {
382                rw_version: RW_VERSION.to_owned(),
383                total_memory_bytes: system_memory_available_bytes() as _,
384                total_cpu_cores: total_cpu_available() as _,
385                hostname: hostname(),
386            }),
387        };
388        let resp = self.inner.heartbeat(request).await?;
389        if let Some(status) = resp.status
390            && status.code() == risingwave_pb::common::status::Code::UnknownWorker
391        {
392            // Ignore the error if we're already shutting down.
393            // Otherwise, exit the process.
394            if !self.shutting_down.load(Relaxed) {
395                tracing::error!(message = status.message, "worker expired");
396                std::process::exit(1);
397            }
398        }
399        Ok(())
400    }
401
402    pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
403        let request = CreateDatabaseRequest { db: Some(db) };
404        let resp = self.inner.create_database(request).await?;
405        // TODO: handle error in `resp.status` here
406        Ok(resp
407            .version
408            .ok_or_else(|| anyhow!("wait version not set"))?)
409    }
410
411    pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
412        let request = CreateSchemaRequest {
413            schema: Some(schema),
414        };
415        let resp = self.inner.create_schema(request).await?;
416        // TODO: handle error in `resp.status` here
417        Ok(resp
418            .version
419            .ok_or_else(|| anyhow!("wait version not set"))?)
420    }
421
422    pub async fn create_materialized_view(
423        &self,
424        table: PbTable,
425        graph: StreamFragmentGraph,
426        dependencies: HashSet<ObjectId>,
427        resource_type: streaming_job_resource_type::ResourceType,
428        if_not_exists: bool,
429    ) -> Result<WaitVersion> {
430        let request = CreateMaterializedViewRequest {
431            materialized_view: Some(table),
432            fragment_graph: Some(graph),
433            resource_type: Some(PbStreamingJobResourceType {
434                resource_type: Some(resource_type),
435            }),
436            dependencies: dependencies.into_iter().collect(),
437            if_not_exists,
438        };
439        let resp = self.inner.create_materialized_view(request).await?;
440        // TODO: handle error in `resp.status` here
441        Ok(resp
442            .version
443            .ok_or_else(|| anyhow!("wait version not set"))?)
444    }
445
446    pub async fn drop_materialized_view(
447        &self,
448        table_id: TableId,
449        cascade: bool,
450    ) -> Result<WaitVersion> {
451        let request = DropMaterializedViewRequest { table_id, cascade };
452
453        let resp = self.inner.drop_materialized_view(request).await?;
454        Ok(resp
455            .version
456            .ok_or_else(|| anyhow!("wait version not set"))?)
457    }
458
459    pub async fn create_source(
460        &self,
461        source: PbSource,
462        graph: Option<StreamFragmentGraph>,
463        if_not_exists: bool,
464    ) -> Result<WaitVersion> {
465        let request = CreateSourceRequest {
466            source: Some(source),
467            fragment_graph: graph,
468            if_not_exists,
469        };
470
471        let resp = self.inner.create_source(request).await?;
472        Ok(resp
473            .version
474            .ok_or_else(|| anyhow!("wait version not set"))?)
475    }
476
477    pub async fn create_sink(
478        &self,
479        sink: PbSink,
480        graph: StreamFragmentGraph,
481        dependencies: HashSet<ObjectId>,
482        if_not_exists: bool,
483    ) -> Result<WaitVersion> {
484        let request = CreateSinkRequest {
485            sink: Some(sink),
486            fragment_graph: Some(graph),
487            dependencies: dependencies.into_iter().collect(),
488            if_not_exists,
489        };
490
491        let resp = self.inner.create_sink(request).await?;
492        Ok(resp
493            .version
494            .ok_or_else(|| anyhow!("wait version not set"))?)
495    }
496
497    pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
498        let request = CreateSubscriptionRequest {
499            subscription: Some(subscription),
500        };
501
502        let resp = self.inner.create_subscription(request).await?;
503        Ok(resp
504            .version
505            .ok_or_else(|| anyhow!("wait version not set"))?)
506    }
507
508    pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
509        let request = CreateFunctionRequest {
510            function: Some(function),
511        };
512        let resp = self.inner.create_function(request).await?;
513        Ok(resp
514            .version
515            .ok_or_else(|| anyhow!("wait version not set"))?)
516    }
517
518    pub async fn create_table(
519        &self,
520        source: Option<PbSource>,
521        table: PbTable,
522        graph: StreamFragmentGraph,
523        job_type: PbTableJobType,
524        if_not_exists: bool,
525        dependencies: HashSet<ObjectId>,
526    ) -> Result<WaitVersion> {
527        let request = CreateTableRequest {
528            materialized_view: Some(table),
529            fragment_graph: Some(graph),
530            source,
531            job_type: job_type as _,
532            if_not_exists,
533            dependencies: dependencies.into_iter().collect(),
534        };
535        let resp = self.inner.create_table(request).await?;
536        // TODO: handle error in `resp.status` here
537        Ok(resp
538            .version
539            .ok_or_else(|| anyhow!("wait version not set"))?)
540    }
541
542    pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
543        let request = CommentOnRequest {
544            comment: Some(comment),
545        };
546        let resp = self.inner.comment_on(request).await?;
547        Ok(resp
548            .version
549            .ok_or_else(|| anyhow!("wait version not set"))?)
550    }
551
552    pub async fn alter_name(
553        &self,
554        object: alter_name_request::Object,
555        name: &str,
556    ) -> Result<WaitVersion> {
557        let request = AlterNameRequest {
558            object: Some(object),
559            new_name: name.to_owned(),
560        };
561        let resp = self.inner.alter_name(request).await?;
562        Ok(resp
563            .version
564            .ok_or_else(|| anyhow!("wait version not set"))?)
565    }
566
567    pub async fn alter_owner(&self, object: Object, owner_id: UserId) -> Result<WaitVersion> {
568        let request = AlterOwnerRequest {
569            object: Some(object),
570            owner_id,
571        };
572        let resp = self.inner.alter_owner(request).await?;
573        Ok(resp
574            .version
575            .ok_or_else(|| anyhow!("wait version not set"))?)
576    }
577
578    pub async fn alter_subscription_retention(
579        &self,
580        subscription_id: SubscriptionId,
581        retention_seconds: u64,
582        definition: String,
583    ) -> Result<WaitVersion> {
584        let request = AlterSubscriptionRetentionRequest {
585            subscription_id,
586            retention_seconds,
587            definition,
588        };
589        let resp = self.inner.alter_subscription_retention(request).await?;
590        Ok(resp
591            .version
592            .ok_or_else(|| anyhow!("wait version not set"))?)
593    }
594
595    pub async fn alter_database_param(
596        &self,
597        database_id: DatabaseId,
598        param: AlterDatabaseParam,
599    ) -> Result<WaitVersion> {
600        let request = match param {
601            AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms) => {
602                let barrier_interval_ms = OptionalUint32 {
603                    value: barrier_interval_ms,
604                };
605                AlterDatabaseParamRequest {
606                    database_id,
607                    param: Some(alter_database_param_request::Param::BarrierIntervalMs(
608                        barrier_interval_ms,
609                    )),
610                }
611            }
612            AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency) => {
613                let checkpoint_frequency = OptionalUint64 {
614                    value: checkpoint_frequency,
615                };
616                AlterDatabaseParamRequest {
617                    database_id,
618                    param: Some(alter_database_param_request::Param::CheckpointFrequency(
619                        checkpoint_frequency,
620                    )),
621                }
622            }
623        };
624        let resp = self.inner.alter_database_param(request).await?;
625        Ok(resp
626            .version
627            .ok_or_else(|| anyhow!("wait version not set"))?)
628    }
629
630    pub async fn alter_set_schema(
631        &self,
632        object: alter_set_schema_request::Object,
633        new_schema_id: SchemaId,
634    ) -> Result<WaitVersion> {
635        let request = AlterSetSchemaRequest {
636            new_schema_id,
637            object: Some(object),
638        };
639        let resp = self.inner.alter_set_schema(request).await?;
640        Ok(resp
641            .version
642            .ok_or_else(|| anyhow!("wait version not set"))?)
643    }
644
645    pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
646        let request = AlterSourceRequest {
647            source: Some(source),
648        };
649        let resp = self.inner.alter_source(request).await?;
650        Ok(resp
651            .version
652            .ok_or_else(|| anyhow!("wait version not set"))?)
653    }
654
655    pub async fn alter_parallelism(
656        &self,
657        job_id: JobId,
658        parallelism: PbTableParallelism,
659        deferred: bool,
660    ) -> Result<()> {
661        let request = AlterParallelismRequest {
662            table_id: job_id,
663            parallelism: Some(parallelism),
664            deferred,
665        };
666
667        self.inner.alter_parallelism(request).await?;
668        Ok(())
669    }
670
671    pub async fn alter_backfill_parallelism(
672        &self,
673        job_id: JobId,
674        parallelism: Option<PbTableParallelism>,
675        deferred: bool,
676    ) -> Result<()> {
677        let request = AlterBackfillParallelismRequest {
678            table_id: job_id,
679            parallelism,
680            deferred,
681        };
682
683        self.inner.alter_backfill_parallelism(request).await?;
684        Ok(())
685    }
686
687    pub async fn alter_streaming_job_config(
688        &self,
689        job_id: JobId,
690        entries_to_add: HashMap<String, String>,
691        keys_to_remove: Vec<String>,
692    ) -> Result<()> {
693        let request = AlterStreamingJobConfigRequest {
694            job_id,
695            entries_to_add,
696            keys_to_remove,
697        };
698
699        self.inner.alter_streaming_job_config(request).await?;
700        Ok(())
701    }
702
703    pub async fn alter_fragment_parallelism(
704        &self,
705        fragment_ids: Vec<FragmentId>,
706        parallelism: Option<PbTableParallelism>,
707    ) -> Result<()> {
708        let request = AlterFragmentParallelismRequest {
709            fragment_ids,
710            parallelism,
711        };
712
713        self.inner.alter_fragment_parallelism(request).await?;
714        Ok(())
715    }
716
717    pub async fn alter_cdc_table_backfill_parallelism(
718        &self,
719        table_id: JobId,
720        parallelism: PbTableParallelism,
721    ) -> Result<()> {
722        let request = AlterCdcTableBackfillParallelismRequest {
723            table_id,
724            parallelism: Some(parallelism),
725        };
726        self.inner
727            .alter_cdc_table_backfill_parallelism(request)
728            .await?;
729        Ok(())
730    }
731
732    pub async fn alter_resource_group(
733        &self,
734        table_id: TableId,
735        resource_group: Option<String>,
736        deferred: bool,
737    ) -> Result<()> {
738        let request = AlterResourceGroupRequest {
739            table_id,
740            resource_group,
741            deferred,
742        };
743
744        self.inner.alter_resource_group(request).await?;
745        Ok(())
746    }
747
748    pub async fn alter_swap_rename(
749        &self,
750        object: alter_swap_rename_request::Object,
751    ) -> Result<WaitVersion> {
752        let request = AlterSwapRenameRequest {
753            object: Some(object),
754        };
755        let resp = self.inner.alter_swap_rename(request).await?;
756        Ok(resp
757            .version
758            .ok_or_else(|| anyhow!("wait version not set"))?)
759    }
760
761    pub async fn alter_secret(
762        &self,
763        secret_id: SecretId,
764        secret_name: String,
765        database_id: DatabaseId,
766        schema_id: SchemaId,
767        owner_id: UserId,
768        value: Vec<u8>,
769    ) -> Result<WaitVersion> {
770        let request = AlterSecretRequest {
771            secret_id,
772            name: secret_name,
773            database_id,
774            schema_id,
775            owner_id,
776            value,
777        };
778        let resp = self.inner.alter_secret(request).await?;
779        Ok(resp
780            .version
781            .ok_or_else(|| anyhow!("wait version not set"))?)
782    }
783
784    pub async fn replace_job(
785        &self,
786        graph: StreamFragmentGraph,
787        replace_job: ReplaceJob,
788    ) -> Result<WaitVersion> {
789        let request = ReplaceJobPlanRequest {
790            plan: Some(ReplaceJobPlan {
791                fragment_graph: Some(graph),
792                replace_job: Some(replace_job),
793            }),
794        };
795        let resp = self.inner.replace_job_plan(request).await?;
796        // TODO: handle error in `resp.status` here
797        Ok(resp
798            .version
799            .ok_or_else(|| anyhow!("wait version not set"))?)
800    }
801
802    pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
803        let request = AutoSchemaChangeRequest {
804            schema_change: Some(schema_change),
805        };
806        let _ = self.inner.auto_schema_change(request).await?;
807        Ok(())
808    }
809
810    pub async fn create_view(
811        &self,
812        view: PbView,
813        dependencies: HashSet<ObjectId>,
814    ) -> Result<WaitVersion> {
815        let request = CreateViewRequest {
816            view: Some(view),
817            dependencies: dependencies.into_iter().collect(),
818        };
819        let resp = self.inner.create_view(request).await?;
820        // TODO: handle error in `resp.status` here
821        Ok(resp
822            .version
823            .ok_or_else(|| anyhow!("wait version not set"))?)
824    }
825
826    pub async fn create_index(
827        &self,
828        index: PbIndex,
829        table: PbTable,
830        graph: StreamFragmentGraph,
831        if_not_exists: bool,
832    ) -> Result<WaitVersion> {
833        let request = CreateIndexRequest {
834            index: Some(index),
835            index_table: Some(table),
836            fragment_graph: Some(graph),
837            if_not_exists,
838        };
839        let resp = self.inner.create_index(request).await?;
840        // TODO: handle error in `resp.status` here
841        Ok(resp
842            .version
843            .ok_or_else(|| anyhow!("wait version not set"))?)
844    }
845
846    pub async fn drop_table(
847        &self,
848        source_id: Option<SourceId>,
849        table_id: TableId,
850        cascade: bool,
851    ) -> Result<WaitVersion> {
852        let request = DropTableRequest {
853            source_id: source_id.map(risingwave_pb::ddl_service::drop_table_request::SourceId::Id),
854            table_id,
855            cascade,
856        };
857
858        let resp = self.inner.drop_table(request).await?;
859        Ok(resp
860            .version
861            .ok_or_else(|| anyhow!("wait version not set"))?)
862    }
863
864    pub async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
865        let request = CompactIcebergTableRequest { sink_id };
866        let resp = self.inner.compact_iceberg_table(request).await?;
867        Ok(resp.task_id)
868    }
869
870    pub async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
871        let request = ExpireIcebergTableSnapshotsRequest { sink_id };
872        let _resp = self.inner.expire_iceberg_table_snapshots(request).await?;
873        Ok(())
874    }
875
876    pub async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<WaitVersion> {
877        let request = DropViewRequest { view_id, cascade };
878        let resp = self.inner.drop_view(request).await?;
879        Ok(resp
880            .version
881            .ok_or_else(|| anyhow!("wait version not set"))?)
882    }
883
884    pub async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<WaitVersion> {
885        let request = DropSourceRequest { source_id, cascade };
886        let resp = self.inner.drop_source(request).await?;
887        Ok(resp
888            .version
889            .ok_or_else(|| anyhow!("wait version not set"))?)
890    }
891
892    pub async fn reset_source(&self, source_id: SourceId) -> Result<WaitVersion> {
893        let request = ResetSourceRequest { source_id };
894        let resp = self.inner.reset_source(request).await?;
895        Ok(resp
896            .version
897            .ok_or_else(|| anyhow!("wait version not set"))?)
898    }
899
900    pub async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<WaitVersion> {
901        let request = DropSinkRequest { sink_id, cascade };
902        let resp = self.inner.drop_sink(request).await?;
903        Ok(resp
904            .version
905            .ok_or_else(|| anyhow!("wait version not set"))?)
906    }
907
908    pub async fn drop_subscription(
909        &self,
910        subscription_id: SubscriptionId,
911        cascade: bool,
912    ) -> Result<WaitVersion> {
913        let request = DropSubscriptionRequest {
914            subscription_id,
915            cascade,
916        };
917        let resp = self.inner.drop_subscription(request).await?;
918        Ok(resp
919            .version
920            .ok_or_else(|| anyhow!("wait version not set"))?)
921    }
922
923    pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
924        let request = DropIndexRequest { index_id, cascade };
925        let resp = self.inner.drop_index(request).await?;
926        Ok(resp
927            .version
928            .ok_or_else(|| anyhow!("wait version not set"))?)
929    }
930
931    pub async fn drop_function(
932        &self,
933        function_id: FunctionId,
934        cascade: bool,
935    ) -> Result<WaitVersion> {
936        let request = DropFunctionRequest {
937            function_id,
938            cascade,
939        };
940        let resp = self.inner.drop_function(request).await?;
941        Ok(resp
942            .version
943            .ok_or_else(|| anyhow!("wait version not set"))?)
944    }
945
946    pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
947        let request = DropDatabaseRequest { database_id };
948        let resp = self.inner.drop_database(request).await?;
949        Ok(resp
950            .version
951            .ok_or_else(|| anyhow!("wait version not set"))?)
952    }
953
954    pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
955        let request = DropSchemaRequest { schema_id, cascade };
956        let resp = self.inner.drop_schema(request).await?;
957        Ok(resp
958            .version
959            .ok_or_else(|| anyhow!("wait version not set"))?)
960    }
961
962    // TODO: using UserInfoVersion instead as return type.
963    pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
964        let request = CreateUserRequest { user: Some(user) };
965        let resp = self.inner.create_user(request).await?;
966        Ok(resp.version)
967    }
968
969    pub async fn drop_user(&self, user_id: UserId) -> Result<u64> {
970        let request = DropUserRequest { user_id };
971        let resp = self.inner.drop_user(request).await?;
972        Ok(resp.version)
973    }
974
975    pub async fn update_user(
976        &self,
977        user: UserInfo,
978        update_fields: Vec<UpdateField>,
979    ) -> Result<u64> {
980        let request = UpdateUserRequest {
981            user: Some(user),
982            update_fields: update_fields
983                .into_iter()
984                .map(|field| field as i32)
985                .collect::<Vec<_>>(),
986        };
987        let resp = self.inner.update_user(request).await?;
988        Ok(resp.version)
989    }
990
991    pub async fn grant_privilege(
992        &self,
993        user_ids: Vec<UserId>,
994        privileges: Vec<GrantPrivilege>,
995        with_grant_option: bool,
996        granted_by: UserId,
997    ) -> Result<u64> {
998        let request = GrantPrivilegeRequest {
999            user_ids,
1000            privileges,
1001            with_grant_option,
1002            granted_by,
1003        };
1004        let resp = self.inner.grant_privilege(request).await?;
1005        Ok(resp.version)
1006    }
1007
1008    pub async fn revoke_privilege(
1009        &self,
1010        user_ids: Vec<UserId>,
1011        privileges: Vec<GrantPrivilege>,
1012        granted_by: UserId,
1013        revoke_by: UserId,
1014        revoke_grant_option: bool,
1015        cascade: bool,
1016    ) -> Result<u64> {
1017        let request = RevokePrivilegeRequest {
1018            user_ids,
1019            privileges,
1020            granted_by,
1021            revoke_by,
1022            revoke_grant_option,
1023            cascade,
1024        };
1025        let resp = self.inner.revoke_privilege(request).await?;
1026        Ok(resp.version)
1027    }
1028
1029    pub async fn alter_default_privilege(
1030        &self,
1031        user_ids: Vec<UserId>,
1032        database_id: DatabaseId,
1033        schema_ids: Vec<SchemaId>,
1034        operation: AlterDefaultPrivilegeOperation,
1035        granted_by: UserId,
1036    ) -> Result<()> {
1037        let request = AlterDefaultPrivilegeRequest {
1038            user_ids,
1039            database_id,
1040            schema_ids,
1041            operation: Some(operation),
1042            granted_by,
1043        };
1044        self.inner.alter_default_privilege(request).await?;
1045        Ok(())
1046    }
1047
1048    /// Unregister the current node from the cluster.
1049    pub async fn unregister(&self) -> Result<()> {
1050        let request = DeleteWorkerNodeRequest {
1051            host: Some(self.host_addr.to_protobuf()),
1052        };
1053        self.inner.delete_worker_node(request).await?;
1054        self.shutting_down.store(true, Relaxed);
1055        Ok(())
1056    }
1057
1058    /// Try to unregister the current worker from the cluster with best effort. Log the result.
1059    pub async fn try_unregister(&self) {
1060        match self.unregister().await {
1061            Ok(_) => {
1062                tracing::info!(
1063                    worker_id = %self.worker_id(),
1064                    "successfully unregistered from meta service",
1065                )
1066            }
1067            Err(e) => {
1068                tracing::warn!(
1069                    error = %e.as_report(),
1070                    worker_id = %self.worker_id(),
1071                    "failed to unregister from meta service",
1072                );
1073            }
1074        }
1075    }
1076
1077    pub async fn list_worker_nodes(
1078        &self,
1079        worker_type: Option<WorkerType>,
1080    ) -> Result<Vec<WorkerNode>> {
1081        let request = ListAllNodesRequest {
1082            worker_type: worker_type.map(Into::into),
1083            include_starting_nodes: true,
1084        };
1085        let resp = self.inner.list_all_nodes(request).await?;
1086        Ok(resp.nodes)
1087    }
1088
1089    /// Starts a heartbeat worker.
1090    pub fn start_heartbeat_loop(
1091        meta_client: MetaClient,
1092        min_interval: Duration,
1093    ) -> (JoinHandle<()>, Sender<()>) {
1094        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1095        let join_handle = tokio::spawn(async move {
1096            let mut min_interval_ticker = tokio::time::interval(min_interval);
1097            loop {
1098                tokio::select! {
1099                    biased;
1100                    // Shutdown
1101                    _ = &mut shutdown_rx => {
1102                        tracing::info!("Heartbeat loop is stopped");
1103                        return;
1104                    }
1105                    // Wait for interval
1106                    _ = min_interval_ticker.tick() => {},
1107                }
1108                tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
1109                match tokio::time::timeout(
1110                    // TODO: decide better min_interval for timeout
1111                    min_interval * 3,
1112                    meta_client.send_heartbeat(),
1113                )
1114                .await
1115                {
1116                    Ok(Ok(_)) => {}
1117                    Ok(Err(err)) => {
1118                        tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
1119                    }
1120                    Err(_) => {
1121                        tracing::warn!("Failed to send_heartbeat: timeout");
1122                    }
1123                }
1124            }
1125        });
1126        (join_handle, shutdown_tx)
1127    }
1128
1129    pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
1130        let request = RisectlListStateTablesRequest {};
1131        let resp = self.inner.risectl_list_state_tables(request).await?;
1132        Ok(resp.tables)
1133    }
1134
1135    pub async fn risectl_resume_backfill(
1136        &self,
1137        request: RisectlResumeBackfillRequest,
1138    ) -> Result<()> {
1139        self.inner.risectl_resume_backfill(request).await?;
1140        Ok(())
1141    }
1142
1143    pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
1144        let request = FlushRequest { database_id };
1145        let resp = self.inner.flush(request).await?;
1146        Ok(resp.hummock_version_id)
1147    }
1148
1149    pub async fn wait(&self) -> Result<WaitVersion> {
1150        let request = WaitRequest {};
1151        let resp = self.inner.wait(request).await?;
1152        Ok(resp
1153            .version
1154            .ok_or_else(|| anyhow!("wait version not set"))?)
1155    }
1156
1157    pub async fn recover(&self) -> Result<()> {
1158        let request = RecoverRequest {};
1159        self.inner.recover(request).await?;
1160        Ok(())
1161    }
1162
1163    pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
1164        let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
1165        let resp = self.inner.cancel_creating_jobs(request).await?;
1166        Ok(resp.canceled_jobs)
1167    }
1168
1169    pub async fn list_table_fragments(
1170        &self,
1171        job_ids: &[JobId],
1172    ) -> Result<HashMap<JobId, TableFragmentInfo>> {
1173        let request = ListTableFragmentsRequest {
1174            table_ids: job_ids.to_vec(),
1175        };
1176        let resp = self.inner.list_table_fragments(request).await?;
1177        Ok(resp.table_fragments)
1178    }
1179
1180    pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1181        let resp = self
1182            .inner
1183            .list_streaming_job_states(ListStreamingJobStatesRequest {})
1184            .await?;
1185        Ok(resp.states)
1186    }
1187
1188    pub async fn list_fragment_distributions(
1189        &self,
1190        include_node: bool,
1191    ) -> Result<Vec<FragmentDistribution>> {
1192        let resp = self
1193            .inner
1194            .list_fragment_distribution(ListFragmentDistributionRequest {
1195                include_node: Some(include_node),
1196            })
1197            .await?;
1198        Ok(resp.distributions)
1199    }
1200
1201    pub async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
1202        let resp = self
1203            .inner
1204            .list_creating_fragment_distribution(ListCreatingFragmentDistributionRequest {
1205                include_node: Some(true),
1206            })
1207            .await?;
1208        Ok(resp.distributions)
1209    }
1210
1211    pub async fn get_fragment_by_id(
1212        &self,
1213        fragment_id: FragmentId,
1214    ) -> Result<Option<FragmentDistribution>> {
1215        let resp = self
1216            .inner
1217            .get_fragment_by_id(GetFragmentByIdRequest { fragment_id })
1218            .await?;
1219        Ok(resp.distribution)
1220    }
1221
1222    pub async fn get_fragment_vnodes(
1223        &self,
1224        fragment_id: FragmentId,
1225    ) -> Result<Vec<(ActorId, Vec<u32>)>> {
1226        let resp = self
1227            .inner
1228            .get_fragment_vnodes(GetFragmentVnodesRequest { fragment_id })
1229            .await?;
1230        Ok(resp
1231            .actor_vnodes
1232            .into_iter()
1233            .map(|actor| (actor.actor_id, actor.vnode_indices))
1234            .collect())
1235    }
1236
1237    pub async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>> {
1238        let resp = self
1239            .inner
1240            .get_actor_vnodes(GetActorVnodesRequest { actor_id })
1241            .await?;
1242        Ok(resp.vnode_indices)
1243    }
1244
1245    pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1246        let resp = self
1247            .inner
1248            .list_actor_states(ListActorStatesRequest {})
1249            .await?;
1250        Ok(resp.states)
1251    }
1252
1253    pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1254        let resp = self
1255            .inner
1256            .list_actor_splits(ListActorSplitsRequest {})
1257            .await?;
1258
1259        Ok(resp.actor_splits)
1260    }
1261
1262    pub async fn pause(&self) -> Result<PauseResponse> {
1263        let request = PauseRequest {};
1264        let resp = self.inner.pause(request).await?;
1265        Ok(resp)
1266    }
1267
1268    pub async fn resume(&self) -> Result<ResumeResponse> {
1269        let request = ResumeRequest {};
1270        let resp = self.inner.resume(request).await?;
1271        Ok(resp)
1272    }
1273
1274    pub async fn apply_throttle(
1275        &self,
1276        throttle_target: PbThrottleTarget,
1277        throttle_type: risingwave_pb::common::PbThrottleType,
1278        id: u32,
1279        rate: Option<u32>,
1280    ) -> Result<ApplyThrottleResponse> {
1281        let request = ApplyThrottleRequest {
1282            throttle_target: throttle_target as i32,
1283            throttle_type: throttle_type as i32,
1284            id,
1285            rate,
1286        };
1287        let resp = self.inner.apply_throttle(request).await?;
1288        Ok(resp)
1289    }
1290
1291    pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1292        let resp = self
1293            .inner
1294            .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1295            .await?;
1296        Ok(resp.get_status().unwrap())
1297    }
1298
1299    pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1300        let request = GetClusterInfoRequest {};
1301        let resp = self.inner.get_cluster_info(request).await?;
1302        Ok(resp)
1303    }
1304
1305    pub async fn reschedule(
1306        &self,
1307        worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1308        revision: u64,
1309        resolve_no_shuffle_upstream: bool,
1310    ) -> Result<(bool, u64)> {
1311        let request = RescheduleRequest {
1312            revision,
1313            resolve_no_shuffle_upstream,
1314            worker_reschedules,
1315        };
1316        let resp = self.inner.reschedule(request).await?;
1317        Ok((resp.success, resp.revision))
1318    }
1319
1320    pub async fn risectl_get_pinned_versions_summary(
1321        &self,
1322    ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1323        let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1324        self.inner
1325            .rise_ctl_get_pinned_versions_summary(request)
1326            .await
1327    }
1328
1329    pub async fn risectl_get_checkpoint_hummock_version(
1330        &self,
1331    ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1332        let request = RiseCtlGetCheckpointVersionRequest {};
1333        self.inner.rise_ctl_get_checkpoint_version(request).await
1334    }
1335
1336    pub async fn risectl_pause_hummock_version_checkpoint(
1337        &self,
1338    ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1339        let request = RiseCtlPauseVersionCheckpointRequest {};
1340        self.inner.rise_ctl_pause_version_checkpoint(request).await
1341    }
1342
1343    pub async fn risectl_resume_hummock_version_checkpoint(
1344        &self,
1345    ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1346        let request = RiseCtlResumeVersionCheckpointRequest {};
1347        self.inner.rise_ctl_resume_version_checkpoint(request).await
1348    }
1349
1350    pub async fn init_metadata_for_replay(
1351        &self,
1352        tables: Vec<PbTable>,
1353        compaction_groups: Vec<CompactionGroupInfo>,
1354    ) -> Result<()> {
1355        let req = InitMetadataForReplayRequest {
1356            tables,
1357            compaction_groups,
1358        };
1359        let _resp = self.inner.init_metadata_for_replay(req).await?;
1360        Ok(())
1361    }
1362
1363    pub async fn replay_version_delta(
1364        &self,
1365        version_delta: HummockVersionDelta,
1366    ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1367        let req = ReplayVersionDeltaRequest {
1368            version_delta: Some(version_delta.into()),
1369        };
1370        let resp = self.inner.replay_version_delta(req).await?;
1371        Ok((
1372            HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1373            resp.modified_compaction_groups,
1374        ))
1375    }
1376
1377    pub async fn list_version_deltas(
1378        &self,
1379        start_id: HummockVersionId,
1380        num_limit: u32,
1381        committed_epoch_limit: HummockEpoch,
1382    ) -> Result<Vec<HummockVersionDelta>> {
1383        let req = ListVersionDeltasRequest {
1384            start_id,
1385            num_limit,
1386            committed_epoch_limit,
1387        };
1388        Ok(self
1389            .inner
1390            .list_version_deltas(req)
1391            .await?
1392            .version_deltas
1393            .unwrap()
1394            .version_deltas
1395            .iter()
1396            .map(HummockVersionDelta::from_rpc_protobuf)
1397            .collect())
1398    }
1399
1400    pub async fn trigger_compaction_deterministic(
1401        &self,
1402        version_id: HummockVersionId,
1403        compaction_groups: Vec<CompactionGroupId>,
1404    ) -> Result<()> {
1405        let req = TriggerCompactionDeterministicRequest {
1406            version_id,
1407            compaction_groups,
1408        };
1409        self.inner.trigger_compaction_deterministic(req).await?;
1410        Ok(())
1411    }
1412
1413    pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1414        let req = DisableCommitEpochRequest {};
1415        Ok(HummockVersion::from_rpc_protobuf(
1416            &self
1417                .inner
1418                .disable_commit_epoch(req)
1419                .await?
1420                .current_version
1421                .unwrap(),
1422        ))
1423    }
1424
1425    pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1426        let req = GetAssignedCompactTaskNumRequest {};
1427        let resp = self.inner.get_assigned_compact_task_num(req).await?;
1428        Ok(resp.num_tasks as usize)
1429    }
1430
1431    pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1432        let req = RiseCtlListCompactionGroupRequest {};
1433        let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1434        Ok(resp.compaction_groups)
1435    }
1436
1437    pub async fn risectl_update_compaction_config(
1438        &self,
1439        compaction_groups: &[CompactionGroupId],
1440        configs: &[MutableConfig],
1441    ) -> Result<()> {
1442        let req = RiseCtlUpdateCompactionConfigRequest {
1443            compaction_group_ids: compaction_groups.to_vec(),
1444            configs: configs
1445                .iter()
1446                .map(
1447                    |c| rise_ctl_update_compaction_config_request::MutableConfig {
1448                        mutable_config: Some(c.clone()),
1449                    },
1450                )
1451                .collect(),
1452        };
1453        let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1454        Ok(())
1455    }
1456
1457    pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1458        let req = BackupMetaRequest { remarks };
1459        let resp = self.inner.backup_meta(req).await?;
1460        Ok(resp.job_id)
1461    }
1462
1463    pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1464        let req = GetBackupJobStatusRequest { job_id };
1465        let resp = self.inner.get_backup_job_status(req).await?;
1466        Ok((resp.job_status(), resp.message))
1467    }
1468
1469    pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1470        let req = DeleteMetaSnapshotRequest {
1471            snapshot_ids: snapshot_ids.to_vec(),
1472        };
1473        let _resp = self.inner.delete_meta_snapshot(req).await?;
1474        Ok(())
1475    }
1476
1477    pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1478        let req = GetMetaSnapshotManifestRequest {};
1479        let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1480        Ok(resp.manifest.expect("should exist"))
1481    }
1482
1483    pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1484        let req = GetTelemetryInfoRequest {};
1485        let resp = self.inner.get_telemetry_info(req).await?;
1486        Ok(resp)
1487    }
1488
1489    pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1490        let req = GetMetaStoreInfoRequest {};
1491        let resp = self.inner.get_meta_store_info(req).await?;
1492        Ok(resp.meta_store_endpoint)
1493    }
1494
1495    pub async fn alter_sink_props(
1496        &self,
1497        sink_id: SinkId,
1498        changed_props: BTreeMap<String, String>,
1499        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1500        connector_conn_ref: Option<ConnectionId>,
1501    ) -> Result<()> {
1502        let req = AlterConnectorPropsRequest {
1503            object_id: sink_id.as_raw_id(),
1504            changed_props: changed_props.into_iter().collect(),
1505            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1506            connector_conn_ref,
1507            object_type: AlterConnectorPropsObject::Sink as i32,
1508            extra_options: None,
1509        };
1510        let _resp = self.inner.alter_connector_props(req).await?;
1511        Ok(())
1512    }
1513
1514    pub async fn alter_iceberg_table_props(
1515        &self,
1516        table_id: TableId,
1517        sink_id: SinkId,
1518        source_id: SourceId,
1519        changed_props: BTreeMap<String, String>,
1520        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1521        connector_conn_ref: Option<ConnectionId>,
1522    ) -> Result<()> {
1523        let req = AlterConnectorPropsRequest {
1524            object_id: table_id.as_raw_id(),
1525            changed_props: changed_props.into_iter().collect(),
1526            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1527            connector_conn_ref,
1528            object_type: AlterConnectorPropsObject::IcebergTable as i32,
1529            extra_options: Some(ExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds {
1530                sink_id,
1531                source_id,
1532            })),
1533        };
1534        let _resp = self.inner.alter_connector_props(req).await?;
1535        Ok(())
1536    }
1537
1538    pub async fn alter_source_connector_props(
1539        &self,
1540        source_id: SourceId,
1541        changed_props: BTreeMap<String, String>,
1542        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1543        connector_conn_ref: Option<ConnectionId>,
1544    ) -> Result<()> {
1545        let req = AlterConnectorPropsRequest {
1546            object_id: source_id.as_raw_id(),
1547            changed_props: changed_props.into_iter().collect(),
1548            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1549            connector_conn_ref,
1550            object_type: AlterConnectorPropsObject::Source as i32,
1551            extra_options: None,
1552        };
1553        let _resp = self.inner.alter_connector_props(req).await?;
1554        Ok(())
1555    }
1556
1557    pub async fn alter_connection_connector_props(
1558        &self,
1559        connection_id: u32,
1560        changed_props: BTreeMap<String, String>,
1561        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1562    ) -> Result<()> {
1563        let req = AlterConnectorPropsRequest {
1564            object_id: connection_id,
1565            changed_props: changed_props.into_iter().collect(),
1566            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1567            connector_conn_ref: None, // Connections don't reference other connections
1568            object_type: AlterConnectorPropsObject::Connection as i32,
1569            extra_options: None,
1570        };
1571        let _resp = self.inner.alter_connector_props(req).await?;
1572        Ok(())
1573    }
1574
1575    /// Orchestrated source property update with pause/update/resume workflow.
1576    /// This is the "safe" version that pauses sources before updating and resumes after.
1577    pub async fn alter_source_properties_safe(
1578        &self,
1579        source_id: SourceId,
1580        changed_props: BTreeMap<String, String>,
1581        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1582        reset_splits: bool,
1583    ) -> Result<()> {
1584        let req = AlterSourcePropertiesSafeRequest {
1585            source_id: source_id.as_raw_id(),
1586            changed_props: changed_props.into_iter().collect(),
1587            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1588            options: Some(PropertyUpdateOptions { reset_splits }),
1589        };
1590        let _resp = self.inner.alter_source_properties_safe(req).await?;
1591        Ok(())
1592    }
1593
1594    /// Reset source split assignments (UNSAFE - admin only).
1595    /// This clears persisted split metadata and triggers re-discovery.
1596    pub async fn reset_source_splits(&self, source_id: SourceId) -> Result<()> {
1597        let req = ResetSourceSplitsRequest {
1598            source_id: source_id.as_raw_id(),
1599        };
1600        let _resp = self.inner.reset_source_splits(req).await?;
1601        Ok(())
1602    }
1603
1604    /// Inject specific offsets into source splits (UNSAFE - admin only).
1605    /// This can cause data duplication or loss depending on the correctness of the provided offsets.
1606    pub async fn inject_source_offsets(
1607        &self,
1608        source_id: SourceId,
1609        split_offsets: HashMap<String, String>,
1610    ) -> Result<Vec<String>> {
1611        let req = InjectSourceOffsetsRequest {
1612            source_id: source_id.as_raw_id(),
1613            split_offsets,
1614        };
1615        let resp = self.inner.inject_source_offsets(req).await?;
1616        Ok(resp.applied_split_ids)
1617    }
1618
1619    pub async fn set_system_param(
1620        &self,
1621        param: String,
1622        value: Option<String>,
1623    ) -> Result<Option<SystemParamsReader>> {
1624        let req = SetSystemParamRequest { param, value };
1625        let resp = self.inner.set_system_param(req).await?;
1626        Ok(resp.params.map(SystemParamsReader::from))
1627    }
1628
1629    pub async fn get_session_params(&self) -> Result<String> {
1630        let req = GetSessionParamsRequest {};
1631        let resp = self.inner.get_session_params(req).await?;
1632        Ok(resp.params)
1633    }
1634
1635    pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1636        let req = SetSessionParamRequest { param, value };
1637        let resp = self.inner.set_session_param(req).await?;
1638        Ok(resp.param)
1639    }
1640
1641    pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1642        let req = GetDdlProgressRequest {};
1643        let resp = self.inner.get_ddl_progress(req).await?;
1644        Ok(resp.ddl_progress)
1645    }
1646
1647    pub async fn split_compaction_group(
1648        &self,
1649        group_id: CompactionGroupId,
1650        table_ids_to_new_group: &[TableId],
1651        partition_vnode_count: u32,
1652    ) -> Result<CompactionGroupId> {
1653        let req = SplitCompactionGroupRequest {
1654            group_id,
1655            table_ids: table_ids_to_new_group.to_vec(),
1656            partition_vnode_count,
1657        };
1658        let resp = self.inner.split_compaction_group(req).await?;
1659        Ok(resp.new_group_id)
1660    }
1661
1662    pub async fn get_tables(
1663        &self,
1664        table_ids: Vec<TableId>,
1665        include_dropped_tables: bool,
1666    ) -> Result<HashMap<TableId, Table>> {
1667        let req = GetTablesRequest {
1668            table_ids,
1669            include_dropped_tables,
1670        };
1671        let resp = self.inner.get_tables(req).await?;
1672        Ok(resp.tables)
1673    }
1674
1675    pub async fn list_serving_vnode_mappings(
1676        &self,
1677    ) -> Result<HashMap<FragmentId, (JobId, WorkerSlotMapping)>> {
1678        let req = GetServingVnodeMappingsRequest {};
1679        let resp = self.inner.get_serving_vnode_mappings(req).await?;
1680        let mappings = resp
1681            .worker_slot_mappings
1682            .into_iter()
1683            .map(|p| {
1684                (
1685                    p.fragment_id,
1686                    (
1687                        resp.fragment_to_table
1688                            .get(&p.fragment_id)
1689                            .cloned()
1690                            .unwrap_or_default(),
1691                        WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1692                    ),
1693                )
1694            })
1695            .collect();
1696        Ok(mappings)
1697    }
1698
1699    pub async fn risectl_list_compaction_status(
1700        &self,
1701    ) -> Result<(
1702        Vec<CompactStatus>,
1703        Vec<CompactTaskAssignment>,
1704        Vec<CompactTaskProgress>,
1705    )> {
1706        let req = RiseCtlListCompactionStatusRequest {};
1707        let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1708        Ok((
1709            resp.compaction_statuses,
1710            resp.task_assignment,
1711            resp.task_progress,
1712        ))
1713    }
1714
1715    pub async fn get_compaction_score(
1716        &self,
1717        compaction_group_id: CompactionGroupId,
1718    ) -> Result<Vec<PickerInfo>> {
1719        let req = GetCompactionScoreRequest {
1720            compaction_group_id,
1721        };
1722        let resp = self.inner.get_compaction_score(req).await?;
1723        Ok(resp.scores)
1724    }
1725
1726    pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1727        let req = RiseCtlRebuildTableStatsRequest {};
1728        let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1729        Ok(())
1730    }
1731
1732    pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1733        let req = ListBranchedObjectRequest {};
1734        let resp = self.inner.list_branched_object(req).await?;
1735        Ok(resp.branched_objects)
1736    }
1737
1738    pub async fn list_active_write_limit(&self) -> Result<HashMap<CompactionGroupId, WriteLimit>> {
1739        let req = ListActiveWriteLimitRequest {};
1740        let resp = self.inner.list_active_write_limit(req).await?;
1741        Ok(resp.write_limits)
1742    }
1743
1744    pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1745        let req = ListHummockMetaConfigRequest {};
1746        let resp = self.inner.list_hummock_meta_config(req).await?;
1747        Ok(resp.configs)
1748    }
1749
1750    pub async fn get_table_change_logs(
1751        &self,
1752        epoch_only: bool,
1753        start_epoch_inclusive: Option<u64>,
1754        end_epoch_inclusive: Option<u64>,
1755        table_ids: Option<HashSet<TableId>>,
1756        exclude_empty: bool,
1757        limit: Option<u32>,
1758    ) -> Result<TableChangeLogs> {
1759        let req = GetTableChangeLogsRequest {
1760            epoch_only,
1761            start_epoch_inclusive,
1762            end_epoch_inclusive,
1763            table_ids: table_ids.map(|iter| PbTableFilter {
1764                table_ids: iter.into_iter().collect::<Vec<_>>(),
1765            }),
1766            exclude_empty,
1767            limit,
1768        };
1769        let resp = self.inner.get_table_change_logs(req).await?;
1770        Ok(resp
1771            .table_change_logs
1772            .into_iter()
1773            .map(|(id, change_log)| (TableId::new(id), TableChangeLog::from_protobuf(&change_log)))
1774            .collect())
1775    }
1776
1777    pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1778        let _resp = self
1779            .inner
1780            .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1781            .await?;
1782
1783        Ok(())
1784    }
1785
1786    pub async fn rw_cloud_validate_source(
1787        &self,
1788        source_type: SourceType,
1789        source_config: HashMap<String, String>,
1790    ) -> Result<RwCloudValidateSourceResponse> {
1791        let req = RwCloudValidateSourceRequest {
1792            source_type: source_type.into(),
1793            source_config,
1794        };
1795        let resp = self.inner.rw_cloud_validate_source(req).await?;
1796        Ok(resp)
1797    }
1798
1799    pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1800        self.inner.core.read().await.sink_coordinate_client.clone()
1801    }
1802
1803    pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1804        let req = ListCompactTaskAssignmentRequest {};
1805        let resp = self.inner.list_compact_task_assignment(req).await?;
1806        Ok(resp.task_assignment)
1807    }
1808
1809    pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1810        let req = ListEventLogRequest::default();
1811        let resp = self.inner.list_event_log(req).await?;
1812        Ok(resp.event_logs)
1813    }
1814
1815    pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1816        let req = ListCompactTaskProgressRequest {};
1817        let resp = self.inner.list_compact_task_progress(req).await?;
1818        Ok(resp.task_progress)
1819    }
1820
1821    #[cfg(madsim)]
1822    pub fn try_add_panic_event_blocking(
1823        &self,
1824        panic_info: impl Display,
1825        timeout_millis: Option<u64>,
1826    ) {
1827    }
1828
1829    /// If `timeout_millis` is None, default is used.
1830    #[cfg(not(madsim))]
1831    pub fn try_add_panic_event_blocking(
1832        &self,
1833        panic_info: impl Display,
1834        timeout_millis: Option<u64>,
1835    ) {
1836        let event = event_log::EventWorkerNodePanic {
1837            worker_id: self.worker_id,
1838            worker_type: self.worker_type.into(),
1839            host_addr: Some(self.host_addr.to_protobuf()),
1840            panic_info: format!("{panic_info}"),
1841        };
1842        let grpc_meta_client = self.inner.clone();
1843        let _ = thread::spawn(move || {
1844            let rt = tokio::runtime::Builder::new_current_thread()
1845                .enable_all()
1846                .build()
1847                .unwrap();
1848            let req = AddEventLogRequest {
1849                event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1850            };
1851            rt.block_on(async {
1852                let _ = tokio::time::timeout(
1853                    Duration::from_millis(timeout_millis.unwrap_or(1000)),
1854                    grpc_meta_client.add_event_log(req),
1855                )
1856                .await;
1857            });
1858        })
1859        .join();
1860    }
1861
1862    pub async fn add_sink_fail_evet(
1863        &self,
1864        sink_id: SinkId,
1865        sink_name: String,
1866        connector: String,
1867        error: String,
1868    ) -> Result<()> {
1869        let event = event_log::EventSinkFail {
1870            sink_id,
1871            sink_name,
1872            connector,
1873            error,
1874        };
1875        let req = AddEventLogRequest {
1876            event: Some(add_event_log_request::Event::SinkFail(event)),
1877        };
1878        self.inner.add_event_log(req).await?;
1879        Ok(())
1880    }
1881
1882    pub async fn add_cdc_auto_schema_change_fail_event(
1883        &self,
1884        source_id: SourceId,
1885        table_name: String,
1886        cdc_table_id: String,
1887        upstream_ddl: String,
1888        fail_info: String,
1889    ) -> Result<()> {
1890        let event = event_log::EventAutoSchemaChangeFail {
1891            table_id: source_id.as_cdc_table_id(),
1892            table_name,
1893            cdc_table_id,
1894            upstream_ddl,
1895            fail_info,
1896        };
1897        let req = AddEventLogRequest {
1898            event: Some(add_event_log_request::Event::AutoSchemaChangeFail(event)),
1899        };
1900        self.inner.add_event_log(req).await?;
1901        Ok(())
1902    }
1903
1904    pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1905        let req = CancelCompactTaskRequest {
1906            task_id,
1907            task_status: task_status as _,
1908        };
1909        let resp = self.inner.cancel_compact_task(req).await?;
1910        Ok(resp.ret)
1911    }
1912
1913    pub async fn get_version_by_epoch(
1914        &self,
1915        epoch: HummockEpoch,
1916        table_id: TableId,
1917    ) -> Result<PbHummockVersion> {
1918        let req = GetVersionByEpochRequest { epoch, table_id };
1919        let resp = self.inner.get_version_by_epoch(req).await?;
1920        Ok(resp.version.unwrap())
1921    }
1922
1923    pub async fn get_cluster_limits(
1924        &self,
1925    ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1926        let req = GetClusterLimitsRequest {};
1927        let resp = self.inner.get_cluster_limits(req).await?;
1928        Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1929    }
1930
1931    pub async fn merge_compaction_group(
1932        &self,
1933        left_group_id: CompactionGroupId,
1934        right_group_id: CompactionGroupId,
1935    ) -> Result<()> {
1936        let req = MergeCompactionGroupRequest {
1937            left_group_id,
1938            right_group_id,
1939        };
1940        self.inner.merge_compaction_group(req).await?;
1941        Ok(())
1942    }
1943
1944    /// List all rate limits for sources and backfills
1945    pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1946        let request = ListRateLimitsRequest {};
1947        let resp = self.inner.list_rate_limits(request).await?;
1948        Ok(resp.rate_limits)
1949    }
1950
1951    pub async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
1952        let request = ListCdcProgressRequest {};
1953        let resp = self.inner.list_cdc_progress(request).await?;
1954        Ok(resp.cdc_progress)
1955    }
1956
1957    pub async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>> {
1958        let request = ListRefreshTableStatesRequest {};
1959        let resp = self.inner.list_refresh_table_states(request).await?;
1960        Ok(resp.states)
1961    }
1962
1963    pub async fn list_iceberg_compaction_status(
1964        &self,
1965    ) -> Result<Vec<list_iceberg_compaction_status_response::IcebergCompactionStatus>> {
1966        let request = ListIcebergCompactionStatusRequest {};
1967        let resp = self.inner.list_iceberg_compaction_status(request).await?;
1968        Ok(resp.statuses)
1969    }
1970
1971    pub async fn list_sink_log_store_tables(
1972        &self,
1973    ) -> Result<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
1974        let request = ListSinkLogStoreTablesRequest {};
1975        let resp = self.inner.list_sink_log_store_tables(request).await?;
1976        Ok(resp.tables)
1977    }
1978
1979    pub async fn create_iceberg_table(
1980        &self,
1981        table_job_info: PbTableJobInfo,
1982        sink_job_info: PbSinkJobInfo,
1983        iceberg_source: PbSource,
1984        if_not_exists: bool,
1985    ) -> Result<WaitVersion> {
1986        let request = CreateIcebergTableRequest {
1987            table_info: Some(table_job_info),
1988            sink_info: Some(sink_job_info),
1989            iceberg_source: Some(iceberg_source),
1990            if_not_exists,
1991        };
1992
1993        let resp = Box::pin(self.inner.create_iceberg_table(request)).await?;
1994        Ok(resp
1995            .version
1996            .ok_or_else(|| anyhow!("wait version not set"))?)
1997    }
1998
1999    pub async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
2000        let request = ListIcebergTablesRequest {};
2001        let resp = self.inner.list_iceberg_tables(request).await?;
2002        Ok(resp.iceberg_tables)
2003    }
2004
2005    /// Get the await tree of all nodes in the cluster.
2006    pub async fn get_cluster_stack_trace(
2007        &self,
2008        actor_traces_format: ActorTracesFormat,
2009    ) -> Result<StackTraceResponse> {
2010        let request = StackTraceRequest {
2011            actor_traces_format: actor_traces_format as i32,
2012        };
2013        let resp = self.inner.stack_trace(request).await?;
2014        Ok(resp)
2015    }
2016
2017    pub async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
2018        let request = SetSyncLogStoreAlignedRequest { job_id, aligned };
2019        self.inner.set_sync_log_store_aligned(request).await?;
2020        Ok(())
2021    }
2022
2023    pub async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
2024        self.inner.refresh(request).await
2025    }
2026
2027    pub async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
2028        let request = ListUnmigratedTablesRequest {};
2029        let resp = self.inner.list_unmigrated_tables(request).await?;
2030        Ok(resp
2031            .tables
2032            .into_iter()
2033            .map(|table| (table.table_id, table.table_name))
2034            .collect())
2035    }
2036}
2037
2038#[async_trait]
2039impl HummockMetaClient for MetaClient {
2040    async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
2041        let req = UnpinVersionBeforeRequest {
2042            context_id: self.worker_id(),
2043            unpin_version_before,
2044        };
2045        self.inner.unpin_version_before(req).await?;
2046        Ok(())
2047    }
2048
2049    async fn get_current_version(&self) -> Result<HummockVersion> {
2050        let req = GetCurrentVersionRequest::default();
2051        Ok(HummockVersion::from_rpc_protobuf(
2052            &self
2053                .inner
2054                .get_current_version(req)
2055                .await?
2056                .current_version
2057                .unwrap(),
2058        ))
2059    }
2060
2061    async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
2062        let resp = self
2063            .inner
2064            .get_new_object_ids(GetNewObjectIdsRequest { number })
2065            .await?;
2066        Ok(ObjectIdRange::new(resp.start_id, resp.end_id))
2067    }
2068
2069    async fn commit_epoch_with_change_log(
2070        &self,
2071        _epoch: HummockEpoch,
2072        _sync_result: SyncResult,
2073        _change_log_info: Option<HummockMetaClientChangeLogInfo>,
2074    ) -> Result<()> {
2075        panic!("Only meta service can commit_epoch in production.")
2076    }
2077
2078    async fn trigger_manual_compaction(
2079        &self,
2080        compaction_group_id: CompactionGroupId,
2081        table_id: JobId,
2082        level: u32,
2083        sst_ids: Vec<HummockSstableId>,
2084        exclusive: bool,
2085    ) -> Result<bool> {
2086        // TODO: support key_range parameter
2087        let req = TriggerManualCompactionRequest {
2088            compaction_group_id,
2089            table_id,
2090            // if table_id not exist, manual_compaction will include all the sst
2091            // without check internal_table_id
2092            level,
2093            sst_ids,
2094            exclusive: Some(exclusive),
2095            ..Default::default()
2096        };
2097
2098        let resp = self.inner.trigger_manual_compaction(req).await?;
2099        Ok(resp.should_retry.unwrap_or(false))
2100    }
2101
2102    async fn trigger_full_gc(
2103        &self,
2104        sst_retention_time_sec: u64,
2105        prefix: Option<String>,
2106    ) -> Result<()> {
2107        self.inner
2108            .trigger_full_gc(TriggerFullGcRequest {
2109                sst_retention_time_sec,
2110                prefix,
2111            })
2112            .await?;
2113        Ok(())
2114    }
2115
2116    async fn subscribe_compaction_event(
2117        &self,
2118    ) -> Result<(
2119        UnboundedSender<SubscribeCompactionEventRequest>,
2120        BoxStream<'static, CompactionEventItem>,
2121    )> {
2122        let (request_sender, request_receiver) =
2123            unbounded_channel::<SubscribeCompactionEventRequest>();
2124        request_sender
2125            .send(SubscribeCompactionEventRequest {
2126                event: Some(subscribe_compaction_event_request::Event::Register(
2127                    Register {
2128                        context_id: self.worker_id(),
2129                    },
2130                )),
2131                create_at: SystemTime::now()
2132                    .duration_since(SystemTime::UNIX_EPOCH)
2133                    .expect("Clock may have gone backwards")
2134                    .as_millis() as u64,
2135            })
2136            .context("Failed to subscribe compaction event")?;
2137
2138        let stream = self
2139            .inner
2140            .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
2141                request_receiver,
2142            )))
2143            .await?;
2144
2145        Ok((request_sender, Box::pin(stream)))
2146    }
2147
2148    async fn get_version_by_epoch(
2149        &self,
2150        epoch: HummockEpoch,
2151        table_id: TableId,
2152    ) -> Result<PbHummockVersion> {
2153        self.get_version_by_epoch(epoch, table_id).await
2154    }
2155
2156    async fn subscribe_iceberg_compaction_event(
2157        &self,
2158    ) -> Result<(
2159        UnboundedSender<SubscribeIcebergCompactionEventRequest>,
2160        BoxStream<'static, IcebergCompactionEventItem>,
2161    )> {
2162        let (request_sender, request_receiver) =
2163            unbounded_channel::<SubscribeIcebergCompactionEventRequest>();
2164        request_sender
2165            .send(SubscribeIcebergCompactionEventRequest {
2166                event: Some(subscribe_iceberg_compaction_event_request::Event::Register(
2167                    IcebergRegister {
2168                        context_id: self.worker_id(),
2169                    },
2170                )),
2171                create_at: SystemTime::now()
2172                    .duration_since(SystemTime::UNIX_EPOCH)
2173                    .expect("Clock may have gone backwards")
2174                    .as_millis() as u64,
2175            })
2176            .context("Failed to subscribe compaction event")?;
2177
2178        let stream = self
2179            .inner
2180            .subscribe_iceberg_compaction_event(Request::new(UnboundedReceiverStream::new(
2181                request_receiver,
2182            )))
2183            .await?;
2184
2185        Ok((request_sender, Box::pin(stream)))
2186    }
2187
2188    async fn get_table_change_logs(
2189        &self,
2190        epoch_only: bool,
2191        start_epoch_inclusive: Option<u64>,
2192        end_epoch_inclusive: Option<u64>,
2193        table_ids: Option<HashSet<TableId>>,
2194        exclude_empty: bool,
2195        limit: Option<u32>,
2196    ) -> Result<TableChangeLogs> {
2197        self.get_table_change_logs(
2198            epoch_only,
2199            start_epoch_inclusive,
2200            end_epoch_inclusive,
2201            table_ids,
2202            exclude_empty,
2203            limit,
2204        )
2205        .await
2206    }
2207}
2208
2209#[async_trait]
2210impl TelemetryInfoFetcher for MetaClient {
2211    async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
2212        let resp = self
2213            .get_telemetry_info()
2214            .await
2215            .map_err(|e| e.to_report_string())?;
2216        let tracking_id = resp.get_tracking_id().ok();
2217        Ok(tracking_id.map(|id| id.to_owned()))
2218    }
2219}
2220
2221pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
2222
2223#[derive(Debug, Clone)]
2224struct GrpcMetaClientCore {
2225    cluster_client: ClusterServiceClient<Channel>,
2226    meta_member_client: MetaMemberServiceClient<Channel>,
2227    heartbeat_client: HeartbeatServiceClient<Channel>,
2228    ddl_client: DdlServiceClient<Channel>,
2229    hummock_client: HummockManagerServiceClient<Channel>,
2230    notification_client: NotificationServiceClient<Channel>,
2231    stream_client: StreamManagerServiceClient<Channel>,
2232    user_client: UserServiceClient<Channel>,
2233    scale_client: ScaleServiceClient<Channel>,
2234    backup_client: BackupServiceClient<Channel>,
2235    telemetry_client: TelemetryInfoServiceClient<Channel>,
2236    system_params_client: SystemParamsServiceClient<Channel>,
2237    session_params_client: SessionParamServiceClient<Channel>,
2238    serving_client: ServingServiceClient<Channel>,
2239    cloud_client: CloudServiceClient<Channel>,
2240    sink_coordinate_client: SinkCoordinationRpcClient,
2241    event_log_client: EventLogServiceClient<Channel>,
2242    cluster_limit_client: ClusterLimitServiceClient<Channel>,
2243    hosted_iceberg_catalog_service_client: HostedIcebergCatalogServiceClient<Channel>,
2244    monitor_client: MonitorServiceClient<Channel>,
2245}
2246
2247impl GrpcMetaClientCore {
2248    pub(crate) fn new(channel: Channel) -> Self {
2249        let cluster_client = ClusterServiceClient::new(channel.clone());
2250        let meta_member_client = MetaMemberClient::new(channel.clone());
2251        let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
2252        let ddl_client =
2253            DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2254        let hummock_client =
2255            HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2256        let notification_client =
2257            NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2258        let stream_client =
2259            StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2260        let user_client = UserServiceClient::new(channel.clone());
2261        let scale_client =
2262            ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2263        let backup_client = BackupServiceClient::new(channel.clone());
2264        let telemetry_client =
2265            TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2266        let system_params_client = SystemParamsServiceClient::new(channel.clone());
2267        let session_params_client = SessionParamServiceClient::new(channel.clone());
2268        let serving_client = ServingServiceClient::new(channel.clone());
2269        let cloud_client = CloudServiceClient::new(channel.clone());
2270        let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone())
2271            .max_decoding_message_size(usize::MAX);
2272        let event_log_client = EventLogServiceClient::new(channel.clone());
2273        let cluster_limit_client = ClusterLimitServiceClient::new(channel.clone());
2274        let hosted_iceberg_catalog_service_client =
2275            HostedIcebergCatalogServiceClient::new(channel.clone());
2276        let monitor_client = MonitorServiceClient::new(channel);
2277
2278        GrpcMetaClientCore {
2279            cluster_client,
2280            meta_member_client,
2281            heartbeat_client,
2282            ddl_client,
2283            hummock_client,
2284            notification_client,
2285            stream_client,
2286            user_client,
2287            scale_client,
2288            backup_client,
2289            telemetry_client,
2290            system_params_client,
2291            session_params_client,
2292            serving_client,
2293            cloud_client,
2294            sink_coordinate_client,
2295            event_log_client,
2296            cluster_limit_client,
2297            hosted_iceberg_catalog_service_client,
2298            monitor_client,
2299        }
2300    }
2301}
2302
2303/// Client to meta server. Cloning the instance is lightweight.
2304///
2305/// It is a wrapper of tonic client. See [`crate::meta_rpc_client_method_impl`].
2306#[derive(Debug, Clone)]
2307struct GrpcMetaClient {
2308    member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
2309    core: Arc<RwLock<GrpcMetaClientCore>>,
2310}
2311
2312type MetaMemberClient = MetaMemberServiceClient<Channel>;
2313
2314struct MetaMemberGroup {
2315    members: LruCache<http::Uri, Option<MetaMemberClient>>,
2316}
2317
2318struct MetaMemberManagement {
2319    core_ref: Arc<RwLock<GrpcMetaClientCore>>,
2320    members: Either<MetaMemberClient, MetaMemberGroup>,
2321    current_leader: http::Uri,
2322    meta_config: Arc<MetaConfig>,
2323}
2324
2325impl MetaMemberManagement {
2326    const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
2327
2328    fn host_address_to_uri(addr: HostAddress) -> http::Uri {
2329        format!("http://{}:{}", addr.host, addr.port)
2330            .parse()
2331            .unwrap()
2332    }
2333
2334    async fn recreate_core(&self, channel: Channel) {
2335        let mut core = self.core_ref.write().await;
2336        *core = GrpcMetaClientCore::new(channel);
2337    }
2338
2339    async fn refresh_members(&mut self) -> Result<()> {
2340        let leader_addr = match self.members.as_mut() {
2341            Either::Left(client) => {
2342                let resp = client
2343                    .to_owned()
2344                    .members(MembersRequest {})
2345                    .await
2346                    .map_err(RpcError::from_meta_status)?;
2347                let resp = resp.into_inner();
2348                resp.members.into_iter().find(|member| member.is_leader)
2349            }
2350            Either::Right(member_group) => {
2351                let mut fetched_members = None;
2352
2353                for (addr, client) in &mut member_group.members {
2354                    let members: Result<_> = try {
2355                        let mut client = match client {
2356                            Some(cached_client) => cached_client.to_owned(),
2357                            None => {
2358                                let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
2359                                let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
2360                                    .await
2361                                    .context("failed to create client")?;
2362                                let new_client: MetaMemberClient =
2363                                    MetaMemberServiceClient::new(channel);
2364                                *client = Some(new_client.clone());
2365
2366                                new_client
2367                            }
2368                        };
2369
2370                        let resp = client
2371                            .members(MembersRequest {})
2372                            .await
2373                            .context("failed to fetch members")?;
2374
2375                        resp.into_inner().members
2376                    };
2377
2378                    let fetched = members.is_ok();
2379                    fetched_members = Some(members);
2380                    if fetched {
2381                        break;
2382                    }
2383                }
2384
2385                let members = fetched_members
2386                    .context("no member available in the list")?
2387                    .context("could not refresh members")?;
2388
2389                // find new leader
2390                let mut leader = None;
2391                for member in members {
2392                    if member.is_leader {
2393                        leader = Some(member.clone());
2394                    }
2395
2396                    let addr = Self::host_address_to_uri(member.address.unwrap());
2397                    // We don't clean any expired addrs here to deal with some extreme situations.
2398                    if !member_group.members.contains(&addr) {
2399                        tracing::info!("new meta member joined: {}", addr);
2400                        member_group.members.put(addr, None);
2401                    }
2402                }
2403
2404                leader
2405            }
2406        };
2407
2408        if let Some(leader) = leader_addr {
2409            let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
2410
2411            if discovered_leader != self.current_leader {
2412                tracing::info!("new meta leader {} discovered", discovered_leader);
2413
2414                let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
2415                    Duration::from_secs(self.meta_config.meta_leader_lease_secs),
2416                    false,
2417                );
2418
2419                let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
2420                    let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
2421                    GrpcMetaClient::connect_to_endpoint(endpoint).await
2422                })
2423                .await?;
2424
2425                self.recreate_core(channel).await;
2426                self.current_leader = discovered_leader;
2427            }
2428        }
2429
2430        Ok(())
2431    }
2432}
2433
2434impl GrpcMetaClient {
2435    // See `Endpoint::http2_keep_alive_interval`
2436    const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
2437    // See `Endpoint::keep_alive_timeout`
2438    const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
2439    // Retry base interval in ms for connecting to meta server.
2440    const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
2441    // Max retry times for connecting to meta server.
2442    const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
2443
2444    fn start_meta_member_monitor(
2445        &self,
2446        init_leader_addr: http::Uri,
2447        members: Either<MetaMemberClient, MetaMemberGroup>,
2448        force_refresh_receiver: Receiver<Sender<Result<()>>>,
2449        meta_config: Arc<MetaConfig>,
2450    ) -> Result<()> {
2451        let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
2452        let current_leader = init_leader_addr;
2453
2454        let enable_period_tick = matches!(members, Either::Right(_));
2455
2456        let member_management = MetaMemberManagement {
2457            core_ref,
2458            members,
2459            current_leader,
2460            meta_config,
2461        };
2462
2463        let mut force_refresh_receiver = force_refresh_receiver;
2464
2465        tokio::spawn(async move {
2466            let mut member_management = member_management;
2467            let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
2468
2469            loop {
2470                let event: Option<Sender<Result<()>>> = if enable_period_tick {
2471                    tokio::select! {
2472                        _ = ticker.tick() => None,
2473                        result_sender = force_refresh_receiver.recv() => {
2474                            if result_sender.is_none() {
2475                                break;
2476                            }
2477
2478                            result_sender
2479                        },
2480                    }
2481                } else {
2482                    let result_sender = force_refresh_receiver.recv().await;
2483
2484                    if result_sender.is_none() {
2485                        break;
2486                    }
2487
2488                    result_sender
2489                };
2490
2491                let tick_result = member_management.refresh_members().await;
2492                if let Err(e) = tick_result.as_ref() {
2493                    tracing::warn!(error = %e.as_report(),  "refresh meta member client failed");
2494                }
2495
2496                if let Some(sender) = event {
2497                    // ignore resp
2498                    let _resp = sender.send(tick_result);
2499                }
2500            }
2501        });
2502
2503        Ok(())
2504    }
2505
2506    async fn force_refresh_leader(&self) -> Result<()> {
2507        let (sender, receiver) = oneshot::channel();
2508
2509        self.member_monitor_event_sender
2510            .send(sender)
2511            .await
2512            .map_err(|e| anyhow!(e))?;
2513
2514        receiver.await.map_err(|e| anyhow!(e))?
2515    }
2516
2517    /// Connect to the meta server from `addrs`.
2518    pub async fn new(strategy: &MetaAddressStrategy, config: Arc<MetaConfig>) -> Result<Self> {
2519        let (channel, addr) = match strategy {
2520            MetaAddressStrategy::LoadBalance(addr) => {
2521                Self::try_build_rpc_channel(vec![addr.clone()]).await
2522            }
2523            MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
2524        }?;
2525        let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
2526        let client = GrpcMetaClient {
2527            member_monitor_event_sender: force_refresh_sender,
2528            core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
2529        };
2530
2531        let meta_member_client = client.core.read().await.meta_member_client.clone();
2532        let members = match strategy {
2533            MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
2534            MetaAddressStrategy::List(addrs) => {
2535                let mut members = LruCache::new(20.try_into().unwrap());
2536                for addr in addrs {
2537                    members.put(addr.clone(), None);
2538                }
2539                members.put(addr.clone(), Some(meta_member_client));
2540
2541                Either::Right(MetaMemberGroup { members })
2542            }
2543        };
2544
2545        client.start_meta_member_monitor(addr, members, force_refresh_receiver, config.clone())?;
2546
2547        client.force_refresh_leader().await?;
2548
2549        Ok(client)
2550    }
2551
2552    fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2553        Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2554    }
2555
2556    pub(crate) async fn try_build_rpc_channel(
2557        addrs: impl IntoIterator<Item = http::Uri>,
2558    ) -> Result<(Channel, http::Uri)> {
2559        let endpoints: Vec<_> = addrs
2560            .into_iter()
2561            .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2562            .collect();
2563
2564        let mut last_error = None;
2565
2566        for (endpoint, addr) in endpoints {
2567            match Self::connect_to_endpoint(endpoint).await {
2568                Ok(channel) => {
2569                    tracing::info!("Connect to meta server {} successfully", addr);
2570                    return Ok((channel, addr));
2571                }
2572                Err(e) => {
2573                    tracing::warn!(
2574                        error = %e.as_report(),
2575                        "Failed to connect to meta server {}, trying again",
2576                        addr,
2577                    );
2578                    last_error = Some(e);
2579                }
2580            }
2581        }
2582
2583        if let Some(last_error) = last_error {
2584            Err(anyhow::anyhow!(last_error)
2585                .context("failed to connect to all meta servers")
2586                .into())
2587        } else {
2588            bail!("no meta server address provided")
2589        }
2590    }
2591
2592    async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2593        let channel = endpoint
2594            .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2595            .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2596            .connect_timeout(Duration::from_secs(5))
2597            .monitored_connect("grpc-meta-client", Default::default())
2598            .await?
2599            .wrapped();
2600
2601        Ok(channel)
2602    }
2603
2604    pub(crate) fn retry_strategy_to_bound(
2605        high_bound: Duration,
2606        exceed: bool,
2607    ) -> impl Iterator<Item = Duration> {
2608        let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2609            .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2610            .map(jitter);
2611
2612        let mut sum = Duration::default();
2613
2614        iter.take_while(move |duration| {
2615            sum += *duration;
2616
2617            if exceed {
2618                sum < high_bound + *duration
2619            } else {
2620                sum < high_bound
2621            }
2622        })
2623    }
2624}
2625
2626macro_rules! for_all_meta_rpc {
2627    ($macro:ident) => {
2628        $macro! {
2629             { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2630            ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2631            ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2632            ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2633            ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2634            ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2635            ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2636            ,{ stream_client, flush, FlushRequest, FlushResponse }
2637            ,{ stream_client, pause, PauseRequest, PauseResponse }
2638            ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2639            ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2640            ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2641            ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2642            ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2643            ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2644            ,{ stream_client, list_creating_fragment_distribution, ListCreatingFragmentDistributionRequest, ListCreatingFragmentDistributionResponse }
2645            ,{ stream_client, list_sink_log_store_tables, ListSinkLogStoreTablesRequest, ListSinkLogStoreTablesResponse }
2646            ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2647            ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2648            ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2649            ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2650            ,{ stream_client, list_cdc_progress, ListCdcProgressRequest, ListCdcProgressResponse }
2651            ,{ stream_client, list_refresh_table_states, ListRefreshTableStatesRequest, ListRefreshTableStatesResponse }
2652            ,{ stream_client, list_iceberg_compaction_status, ListIcebergCompactionStatusRequest, ListIcebergCompactionStatusResponse }
2653            ,{ stream_client, alter_connector_props, AlterConnectorPropsRequest, AlterConnectorPropsResponse }
2654            ,{ stream_client, alter_source_properties_safe, AlterSourcePropertiesSafeRequest, AlterSourcePropertiesSafeResponse }
2655            ,{ stream_client, reset_source_splits, ResetSourceSplitsRequest, ResetSourceSplitsResponse }
2656            ,{ stream_client, inject_source_offsets, InjectSourceOffsetsRequest, InjectSourceOffsetsResponse }
2657            ,{ stream_client, get_fragment_by_id, GetFragmentByIdRequest, GetFragmentByIdResponse }
2658            ,{ stream_client, get_fragment_vnodes, GetFragmentVnodesRequest, GetFragmentVnodesResponse }
2659            ,{ stream_client, get_actor_vnodes, GetActorVnodesRequest, GetActorVnodesResponse }
2660            ,{ stream_client, set_sync_log_store_aligned, SetSyncLogStoreAlignedRequest, SetSyncLogStoreAlignedResponse }
2661            ,{ stream_client, refresh, RefreshRequest, RefreshResponse }
2662            ,{ stream_client, list_unmigrated_tables, ListUnmigratedTablesRequest, ListUnmigratedTablesResponse }
2663            ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2664            ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2665            ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2666            ,{ ddl_client, alter_subscription_retention, AlterSubscriptionRetentionRequest, AlterSubscriptionRetentionResponse }
2667            ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2668            ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2669            ,{ ddl_client, alter_backfill_parallelism, AlterBackfillParallelismRequest, AlterBackfillParallelismResponse }
2670            ,{ ddl_client, alter_streaming_job_config, AlterStreamingJobConfigRequest, AlterStreamingJobConfigResponse }
2671            ,{ ddl_client, alter_fragment_parallelism, AlterFragmentParallelismRequest, AlterFragmentParallelismResponse }
2672            ,{ ddl_client, alter_cdc_table_backfill_parallelism, AlterCdcTableBackfillParallelismRequest, AlterCdcTableBackfillParallelismResponse }
2673            ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2674            ,{ ddl_client, alter_database_param, AlterDatabaseParamRequest, AlterDatabaseParamResponse }
2675            ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2676            ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2677            ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2678            ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2679            ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2680            ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2681            ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2682            ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2683            ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2684            ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2685            ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2686            ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2687            ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2688            ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2689            ,{ ddl_client, reset_source, ResetSourceRequest, ResetSourceResponse }
2690            ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2691            ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2692            ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2693            ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2694            ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2695            ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2696            ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2697            ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2698            ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2699            ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2700            ,{ ddl_client, risectl_resume_backfill, RisectlResumeBackfillRequest, RisectlResumeBackfillResponse }
2701            ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2702            ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2703            ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2704            ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2705            ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2706            ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2707            ,{ ddl_client, wait, WaitRequest, WaitResponse }
2708            ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2709            ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2710            ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2711            ,{ ddl_client, compact_iceberg_table, CompactIcebergTableRequest, CompactIcebergTableResponse }
2712            ,{ ddl_client, expire_iceberg_table_snapshots, ExpireIcebergTableSnapshotsRequest, ExpireIcebergTableSnapshotsResponse }
2713            ,{ ddl_client, create_iceberg_table, CreateIcebergTableRequest, CreateIcebergTableResponse }
2714            ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2715            ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2716            ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2717            ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2718            ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2719            ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2720            ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2721            ,{ hummock_client, get_new_object_ids, GetNewObjectIdsRequest, GetNewObjectIdsResponse }
2722            ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2723            ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2724            ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2725            ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2726            ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2727            ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2728            ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2729            ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2730            ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2731            ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2732            ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2733            ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2734            ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2735            ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2736            ,{ hummock_client, subscribe_iceberg_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeIcebergCompactionEventRequest>, Streaming<SubscribeIcebergCompactionEventResponse> }
2737            ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2738            ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2739            ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2740            ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2741            ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2742            ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2743            ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2744            ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2745            ,{ hummock_client, get_table_change_logs, GetTableChangeLogsRequest, GetTableChangeLogsResponse }
2746            ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2747            ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2748            ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2749            ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2750            ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2751            ,{ user_client, alter_default_privilege, AlterDefaultPrivilegeRequest, AlterDefaultPrivilegeResponse }
2752            ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2753            ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2754            ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2755            ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2756            ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2757            ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2758            ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2759            ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2760            ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2761            ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2762            ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2763            ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2764            ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2765            ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2766            ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2767            ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2768            ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2769            ,{ hosted_iceberg_catalog_service_client, list_iceberg_tables, ListIcebergTablesRequest, ListIcebergTablesResponse }
2770            ,{ monitor_client, stack_trace, StackTraceRequest, StackTraceResponse }
2771        }
2772    };
2773}
2774
2775impl GrpcMetaClient {
2776    async fn refresh_client_if_needed(&self, code: Code) {
2777        if matches!(
2778            code,
2779            Code::Unknown | Code::Unimplemented | Code::Unavailable
2780        ) {
2781            tracing::debug!("matching tonic code {}", code);
2782            let (result_sender, result_receiver) = oneshot::channel();
2783            if self
2784                .member_monitor_event_sender
2785                .try_send(result_sender)
2786                .is_ok()
2787            {
2788                if let Ok(Err(e)) = result_receiver.await {
2789                    tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2790                }
2791            } else {
2792                tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2793            }
2794        }
2795    }
2796}
2797
2798impl GrpcMetaClient {
2799    for_all_meta_rpc! { meta_rpc_client_method_impl }
2800}