risingwave_rpc_client/
meta_client.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{
33    AlterDatabaseParam, FunctionId, IndexId, ObjectId, SecretId, TableId,
34};
35use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
36use risingwave_common::hash::WorkerSlotMapping;
37use risingwave_common::id::{
38    ConnectionId, DatabaseId, JobId, SchemaId, SinkId, SubscriptionId, UserId, ViewId, WorkerId,
39};
40use risingwave_common::monitor::EndpointExt;
41use risingwave_common::system_param::AdaptiveParallelismStrategy;
42use risingwave_common::system_param::reader::SystemParamsReader;
43use risingwave_common::telemetry::report::TelemetryInfoFetcher;
44use risingwave_common::util::addr::HostAddr;
45use risingwave_common::util::meta_addr::MetaAddressStrategy;
46use risingwave_common::util::resource_util::cpu::total_cpu_available;
47use risingwave_common::util::resource_util::hostname;
48use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
49use risingwave_error::bail;
50use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
51use risingwave_hummock_sdk::change_log::{TableChangeLog, TableChangeLogs};
52use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
53use risingwave_hummock_sdk::{
54    CompactionGroupId, HummockEpoch, HummockVersionId, ObjectIdRange, SyncResult,
55};
56use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
57use risingwave_pb::backup_service::*;
58use risingwave_pb::catalog::{
59    Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
60    PbSubscription, PbTable, PbView, Table,
61};
62use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
63use risingwave_pb::cloud_service::*;
64use risingwave_pb::common::worker_node::Property;
65use risingwave_pb::common::{HostAddress, OptionalUint32, OptionalUint64, WorkerNode, WorkerType};
66use risingwave_pb::configured_monitor_service_client;
67use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
68use risingwave_pb::ddl_service::alter_owner_request::Object;
69use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
70use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
71use risingwave_pb::ddl_service::*;
72use risingwave_pb::hummock::compact_task::TaskStatus;
73use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
74use risingwave_pb::hummock::get_table_change_logs_request::PbTableFilter;
75use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
76use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
77use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
78use risingwave_pb::hummock::write_limits::WriteLimit;
79use risingwave_pb::hummock::*;
80use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Register as IcebergRegister;
81use risingwave_pb::iceberg_compaction::{
82    SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
83    subscribe_iceberg_compaction_event_request,
84};
85use risingwave_pb::id::{ActorId, FragmentId, HummockSstableId, SourceId};
86use risingwave_pb::meta::alter_connector_props_request::{
87    AlterConnectorPropsObject, AlterIcebergTableIds, ExtraOptions,
88};
89use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
90use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
91use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
92use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
93use risingwave_pb::meta::hosted_iceberg_catalog_service_client::HostedIcebergCatalogServiceClient;
94use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
95use risingwave_pb::meta::list_actor_states_response::ActorState;
96use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
97use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
98use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
99use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
100use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
101use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
102use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
103use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
104use risingwave_pb::meta::serving_service_client::ServingServiceClient;
105use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
106use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
107use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
108use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
109use risingwave_pb::meta::{FragmentDistribution, *};
110use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
111use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
112use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
113use risingwave_pb::secret::PbSecretRef;
114use risingwave_pb::stream_plan::StreamFragmentGraph;
115use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
116use risingwave_pb::user::update_user_request::UpdateField;
117use risingwave_pb::user::user_service_client::UserServiceClient;
118use risingwave_pb::user::*;
119use thiserror_ext::AsReport;
120use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
121use tokio::sync::oneshot::Sender;
122use tokio::sync::{RwLock, mpsc, oneshot};
123use tokio::task::JoinHandle;
124use tokio::time::{self};
125use tokio_retry::strategy::{ExponentialBackoff, jitter};
126use tokio_stream::wrappers::UnboundedReceiverStream;
127use tonic::transport::Endpoint;
128use tonic::{Code, Request, Streaming};
129
130use crate::channel::{Channel, WrappedChannelExt};
131use crate::error::{Result, RpcError};
132use crate::hummock_meta_client::{
133    CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
134    IcebergCompactionEventItem,
135};
136use crate::meta_rpc_client_method_impl;
137
138/// Client to meta server. Cloning the instance is lightweight.
139#[derive(Clone, Debug)]
140pub struct MetaClient {
141    worker_id: WorkerId,
142    worker_type: WorkerType,
143    host_addr: HostAddr,
144    inner: GrpcMetaClient,
145    meta_config: Arc<MetaConfig>,
146    cluster_id: String,
147    shutting_down: Arc<AtomicBool>,
148}
149
150impl MetaClient {
151    pub fn worker_id(&self) -> WorkerId {
152        self.worker_id
153    }
154
155    pub fn host_addr(&self) -> &HostAddr {
156        &self.host_addr
157    }
158
159    pub fn worker_type(&self) -> WorkerType {
160        self.worker_type
161    }
162
163    pub fn cluster_id(&self) -> &str {
164        &self.cluster_id
165    }
166
167    /// Subscribe to notification from meta.
168    pub async fn subscribe(
169        &self,
170        subscribe_type: SubscribeType,
171    ) -> Result<Streaming<SubscribeResponse>> {
172        let request = SubscribeRequest {
173            subscribe_type: subscribe_type as i32,
174            host: Some(self.host_addr.to_protobuf()),
175            worker_id: self.worker_id(),
176        };
177
178        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
179            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
180            true,
181        );
182
183        tokio_retry::Retry::spawn(retry_strategy, || async {
184            let request = request.clone();
185            self.inner.subscribe(request).await
186        })
187        .await
188    }
189
190    pub async fn create_connection(
191        &self,
192        connection_name: String,
193        database_id: DatabaseId,
194        schema_id: SchemaId,
195        owner_id: UserId,
196        req: create_connection_request::Payload,
197    ) -> Result<WaitVersion> {
198        let request = CreateConnectionRequest {
199            name: connection_name,
200            database_id,
201            schema_id,
202            owner_id,
203            payload: Some(req),
204        };
205        let resp = self.inner.create_connection(request).await?;
206        Ok(resp
207            .version
208            .ok_or_else(|| anyhow!("wait version not set"))?)
209    }
210
211    pub async fn create_secret(
212        &self,
213        secret_name: String,
214        database_id: DatabaseId,
215        schema_id: SchemaId,
216        owner_id: UserId,
217        value: Vec<u8>,
218    ) -> Result<WaitVersion> {
219        let request = CreateSecretRequest {
220            name: secret_name,
221            database_id,
222            schema_id,
223            owner_id,
224            value,
225        };
226        let resp = self.inner.create_secret(request).await?;
227        Ok(resp
228            .version
229            .ok_or_else(|| anyhow!("wait version not set"))?)
230    }
231
232    pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
233        let request = ListConnectionsRequest {};
234        let resp = self.inner.list_connections(request).await?;
235        Ok(resp.connections)
236    }
237
238    pub async fn drop_connection(
239        &self,
240        connection_id: ConnectionId,
241        cascade: bool,
242    ) -> Result<WaitVersion> {
243        let request = DropConnectionRequest {
244            connection_id,
245            cascade,
246        };
247        let resp = self.inner.drop_connection(request).await?;
248        Ok(resp
249            .version
250            .ok_or_else(|| anyhow!("wait version not set"))?)
251    }
252
253    pub async fn drop_secret(&self, secret_id: SecretId, cascade: bool) -> Result<WaitVersion> {
254        let request = DropSecretRequest { secret_id, cascade };
255        let resp = self.inner.drop_secret(request).await?;
256        Ok(resp
257            .version
258            .ok_or_else(|| anyhow!("wait version not set"))?)
259    }
260
261    /// Register the current node to the cluster and set the corresponding worker id.
262    ///
263    /// Retry if there's connection issue with the meta node. Exit the process if the registration fails.
264    pub async fn register_new(
265        addr_strategy: MetaAddressStrategy,
266        worker_type: WorkerType,
267        addr: &HostAddr,
268        property: Property,
269        meta_config: Arc<MetaConfig>,
270    ) -> (Self, SystemParamsReader) {
271        let ret =
272            Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
273
274        match ret {
275            Ok(ret) => ret,
276            Err(err) => {
277                tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
278                std::process::exit(1);
279            }
280        }
281    }
282
283    async fn register_new_inner(
284        addr_strategy: MetaAddressStrategy,
285        worker_type: WorkerType,
286        addr: &HostAddr,
287        property: Property,
288        meta_config: Arc<MetaConfig>,
289    ) -> Result<(Self, SystemParamsReader)> {
290        tracing::info!("register meta client using strategy: {}", addr_strategy);
291
292        // Retry until reaching `max_heartbeat_interval_secs`
293        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
294            Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
295            true,
296        );
297
298        let init_result: Result<_> = tokio_retry::RetryIf::spawn(
299            retry_strategy,
300            || async {
301                let grpc_meta_client =
302                    GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
303
304                let add_worker_resp = grpc_meta_client
305                    .add_worker_node(AddWorkerNodeRequest {
306                        worker_type: worker_type as i32,
307                        host: Some(addr.to_protobuf()),
308                        property: Some(property.clone()),
309                        resource: Some(risingwave_pb::common::worker_node::Resource {
310                            rw_version: RW_VERSION.to_owned(),
311                            total_memory_bytes: system_memory_available_bytes() as _,
312                            total_cpu_cores: total_cpu_available() as _,
313                            hostname: hostname(),
314                        }),
315                    })
316                    .await
317                    .context("failed to add worker node")?;
318
319                let system_params_resp = grpc_meta_client
320                    .get_system_params(GetSystemParamsRequest {})
321                    .await
322                    .context("failed to get initial system params")?;
323
324                Ok((add_worker_resp, system_params_resp, grpc_meta_client))
325            },
326            // Only retry if there's any transient connection issue.
327            // If the error is from our implementation or business, do not retry it.
328            |e: &RpcError| !e.is_from_tonic_server_impl(),
329        )
330        .await;
331
332        let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
333        let worker_id = add_worker_resp
334            .node_id
335            .expect("AddWorkerNodeResponse::node_id is empty");
336
337        let meta_client = Self {
338            worker_id,
339            worker_type,
340            host_addr: addr.clone(),
341            inner: grpc_meta_client,
342            meta_config: meta_config.clone(),
343            cluster_id: add_worker_resp.cluster_id,
344            shutting_down: Arc::new(false.into()),
345        };
346
347        static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
348        REPORT_PANIC.call_once(|| {
349            let meta_client_clone = meta_client.clone();
350            std::panic::update_hook(move |default_hook, info| {
351                // Try to report panic event to meta node.
352                meta_client_clone.try_add_panic_event_blocking(info, None);
353                default_hook(info);
354            });
355        });
356
357        Ok((meta_client, system_params_resp.params.unwrap().into()))
358    }
359
360    /// Activate the current node in cluster to confirm it's ready to serve.
361    pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
362        let request = ActivateWorkerNodeRequest {
363            host: Some(addr.to_protobuf()),
364            node_id: self.worker_id,
365        };
366        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
367            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
368            true,
369        );
370        tokio_retry::Retry::spawn(retry_strategy, || async {
371            let request = request.clone();
372            self.inner.activate_worker_node(request).await
373        })
374        .await?;
375
376        Ok(())
377    }
378
379    /// Send heartbeat signal to meta service.
380    pub async fn send_heartbeat(&self) -> Result<()> {
381        let request = HeartbeatRequest {
382            node_id: self.worker_id,
383            resource: Some(risingwave_pb::common::worker_node::Resource {
384                rw_version: RW_VERSION.to_owned(),
385                total_memory_bytes: system_memory_available_bytes() as _,
386                total_cpu_cores: total_cpu_available() as _,
387                hostname: hostname(),
388            }),
389        };
390        let resp = self.inner.heartbeat(request).await?;
391        if let Some(status) = resp.status
392            && status.code() == risingwave_pb::common::status::Code::UnknownWorker
393        {
394            // Ignore the error if we're already shutting down.
395            // Otherwise, exit the process.
396            if !self.shutting_down.load(Relaxed) {
397                tracing::error!(message = status.message, "worker expired");
398                std::process::exit(1);
399            }
400        }
401        Ok(())
402    }
403
404    pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
405        let request = CreateDatabaseRequest { db: Some(db) };
406        let resp = self.inner.create_database(request).await?;
407        // TODO: handle error in `resp.status` here
408        Ok(resp
409            .version
410            .ok_or_else(|| anyhow!("wait version not set"))?)
411    }
412
413    pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
414        let request = CreateSchemaRequest {
415            schema: Some(schema),
416        };
417        let resp = self.inner.create_schema(request).await?;
418        // TODO: handle error in `resp.status` here
419        Ok(resp
420            .version
421            .ok_or_else(|| anyhow!("wait version not set"))?)
422    }
423
424    pub async fn create_materialized_view(
425        &self,
426        table: PbTable,
427        graph: StreamFragmentGraph,
428        dependencies: HashSet<ObjectId>,
429        resource_type: streaming_job_resource_type::ResourceType,
430        if_not_exists: bool,
431        refresh_interval_sec: Option<u64>,
432    ) -> Result<WaitVersion> {
433        let request = CreateMaterializedViewRequest {
434            materialized_view: Some(table),
435            fragment_graph: Some(graph),
436            resource_type: Some(PbStreamingJobResourceType {
437                resource_type: Some(resource_type),
438            }),
439            dependencies: dependencies.into_iter().collect(),
440            if_not_exists,
441            refresh_interval_sec,
442        };
443        let resp = self.inner.create_materialized_view(request).await?;
444        // TODO: handle error in `resp.status` here
445        Ok(resp
446            .version
447            .ok_or_else(|| anyhow!("wait version not set"))?)
448    }
449
450    pub async fn drop_materialized_view(
451        &self,
452        table_id: TableId,
453        cascade: bool,
454    ) -> Result<WaitVersion> {
455        let request = DropMaterializedViewRequest { table_id, cascade };
456
457        let resp = self.inner.drop_materialized_view(request).await?;
458        Ok(resp
459            .version
460            .ok_or_else(|| anyhow!("wait version not set"))?)
461    }
462
463    pub async fn create_source(
464        &self,
465        source: PbSource,
466        graph: Option<StreamFragmentGraph>,
467        if_not_exists: bool,
468    ) -> Result<WaitVersion> {
469        let request = CreateSourceRequest {
470            source: Some(source),
471            fragment_graph: graph,
472            if_not_exists,
473        };
474
475        let resp = self.inner.create_source(request).await?;
476        Ok(resp
477            .version
478            .ok_or_else(|| anyhow!("wait version not set"))?)
479    }
480
481    pub async fn create_sink(
482        &self,
483        sink: PbSink,
484        graph: StreamFragmentGraph,
485        dependencies: HashSet<ObjectId>,
486        if_not_exists: bool,
487    ) -> Result<WaitVersion> {
488        let request = CreateSinkRequest {
489            sink: Some(sink),
490            fragment_graph: Some(graph),
491            dependencies: dependencies.into_iter().collect(),
492            if_not_exists,
493        };
494
495        let resp = self.inner.create_sink(request).await?;
496        Ok(resp
497            .version
498            .ok_or_else(|| anyhow!("wait version not set"))?)
499    }
500
501    pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
502        let request = CreateSubscriptionRequest {
503            subscription: Some(subscription),
504        };
505
506        let resp = self.inner.create_subscription(request).await?;
507        Ok(resp
508            .version
509            .ok_or_else(|| anyhow!("wait version not set"))?)
510    }
511
512    pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
513        let request = CreateFunctionRequest {
514            function: Some(function),
515        };
516        let resp = self.inner.create_function(request).await?;
517        Ok(resp
518            .version
519            .ok_or_else(|| anyhow!("wait version not set"))?)
520    }
521
522    pub async fn create_table(
523        &self,
524        source: Option<PbSource>,
525        table: PbTable,
526        graph: StreamFragmentGraph,
527        job_type: PbTableJobType,
528        if_not_exists: bool,
529        dependencies: HashSet<ObjectId>,
530    ) -> Result<WaitVersion> {
531        let request = CreateTableRequest {
532            materialized_view: Some(table),
533            fragment_graph: Some(graph),
534            source,
535            job_type: job_type as _,
536            if_not_exists,
537            dependencies: dependencies.into_iter().collect(),
538        };
539        let resp = self.inner.create_table(request).await?;
540        // TODO: handle error in `resp.status` here
541        Ok(resp
542            .version
543            .ok_or_else(|| anyhow!("wait version not set"))?)
544    }
545
546    pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
547        let request = CommentOnRequest {
548            comment: Some(comment),
549        };
550        let resp = self.inner.comment_on(request).await?;
551        Ok(resp
552            .version
553            .ok_or_else(|| anyhow!("wait version not set"))?)
554    }
555
556    pub async fn alter_name(
557        &self,
558        object: alter_name_request::Object,
559        name: &str,
560    ) -> Result<WaitVersion> {
561        let request = AlterNameRequest {
562            object: Some(object),
563            new_name: name.to_owned(),
564        };
565        let resp = self.inner.alter_name(request).await?;
566        Ok(resp
567            .version
568            .ok_or_else(|| anyhow!("wait version not set"))?)
569    }
570
571    pub async fn alter_owner(&self, object: Object, owner_id: UserId) -> Result<WaitVersion> {
572        let request = AlterOwnerRequest {
573            object: Some(object),
574            owner_id,
575        };
576        let resp = self.inner.alter_owner(request).await?;
577        Ok(resp
578            .version
579            .ok_or_else(|| anyhow!("wait version not set"))?)
580    }
581
582    pub async fn alter_subscription_retention(
583        &self,
584        subscription_id: SubscriptionId,
585        retention_seconds: u64,
586        definition: String,
587    ) -> Result<WaitVersion> {
588        let request = AlterSubscriptionRetentionRequest {
589            subscription_id,
590            retention_seconds,
591            definition,
592        };
593        let resp = self.inner.alter_subscription_retention(request).await?;
594        Ok(resp
595            .version
596            .ok_or_else(|| anyhow!("wait version not set"))?)
597    }
598
599    pub async fn alter_database_param(
600        &self,
601        database_id: DatabaseId,
602        param: AlterDatabaseParam,
603    ) -> Result<WaitVersion> {
604        let request = match param {
605            AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms) => {
606                let barrier_interval_ms = OptionalUint32 {
607                    value: barrier_interval_ms,
608                };
609                AlterDatabaseParamRequest {
610                    database_id,
611                    param: Some(alter_database_param_request::Param::BarrierIntervalMs(
612                        barrier_interval_ms,
613                    )),
614                }
615            }
616            AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency) => {
617                let checkpoint_frequency = OptionalUint64 {
618                    value: checkpoint_frequency,
619                };
620                AlterDatabaseParamRequest {
621                    database_id,
622                    param: Some(alter_database_param_request::Param::CheckpointFrequency(
623                        checkpoint_frequency,
624                    )),
625                }
626            }
627        };
628        let resp = self.inner.alter_database_param(request).await?;
629        Ok(resp
630            .version
631            .ok_or_else(|| anyhow!("wait version not set"))?)
632    }
633
634    pub async fn alter_set_schema(
635        &self,
636        object: alter_set_schema_request::Object,
637        new_schema_id: SchemaId,
638    ) -> Result<WaitVersion> {
639        let request = AlterSetSchemaRequest {
640            new_schema_id,
641            object: Some(object),
642        };
643        let resp = self.inner.alter_set_schema(request).await?;
644        Ok(resp
645            .version
646            .ok_or_else(|| anyhow!("wait version not set"))?)
647    }
648
649    pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
650        let request = AlterSourceRequest {
651            source: Some(source),
652        };
653        let resp = self.inner.alter_source(request).await?;
654        Ok(resp
655            .version
656            .ok_or_else(|| anyhow!("wait version not set"))?)
657    }
658
659    pub async fn alter_parallelism(
660        &self,
661        job_id: JobId,
662        parallelism: PbTableParallelism,
663        adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
664        deferred: bool,
665    ) -> Result<()> {
666        let request = AlterParallelismRequest {
667            table_id: job_id,
668            parallelism: Some(parallelism),
669            deferred,
670            adaptive_parallelism_strategy: adaptive_parallelism_strategy
671                .map(|strategy| strategy.to_string()),
672        };
673
674        self.inner.alter_parallelism(request).await?;
675        Ok(())
676    }
677
678    pub async fn alter_backfill_parallelism(
679        &self,
680        job_id: JobId,
681        parallelism: Option<PbTableParallelism>,
682        adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
683        deferred: bool,
684    ) -> Result<()> {
685        let request = AlterBackfillParallelismRequest {
686            table_id: job_id,
687            parallelism,
688            deferred,
689            adaptive_parallelism_strategy: adaptive_parallelism_strategy
690                .map(|strategy| strategy.to_string()),
691        };
692
693        self.inner.alter_backfill_parallelism(request).await?;
694        Ok(())
695    }
696
697    pub async fn alter_streaming_job_config(
698        &self,
699        job_id: JobId,
700        entries_to_add: HashMap<String, String>,
701        keys_to_remove: Vec<String>,
702    ) -> Result<()> {
703        let request = AlterStreamingJobConfigRequest {
704            job_id,
705            entries_to_add,
706            keys_to_remove,
707        };
708
709        self.inner.alter_streaming_job_config(request).await?;
710        Ok(())
711    }
712
713    pub async fn alter_fragment_parallelism(
714        &self,
715        fragment_ids: Vec<FragmentId>,
716        parallelism: Option<PbTableParallelism>,
717    ) -> Result<()> {
718        let request = AlterFragmentParallelismRequest {
719            fragment_ids,
720            parallelism,
721        };
722
723        self.inner.alter_fragment_parallelism(request).await?;
724        Ok(())
725    }
726
727    pub async fn alter_cdc_table_backfill_parallelism(
728        &self,
729        table_id: JobId,
730        parallelism: PbTableParallelism,
731    ) -> Result<()> {
732        let request = AlterCdcTableBackfillParallelismRequest {
733            table_id,
734            parallelism: Some(parallelism),
735        };
736        self.inner
737            .alter_cdc_table_backfill_parallelism(request)
738            .await?;
739        Ok(())
740    }
741
742    pub async fn alter_resource_group(
743        &self,
744        table_id: TableId,
745        resource_group: Option<String>,
746        deferred: bool,
747    ) -> Result<()> {
748        let request = AlterResourceGroupRequest {
749            table_id,
750            resource_group,
751            deferred,
752        };
753
754        self.inner.alter_resource_group(request).await?;
755        Ok(())
756    }
757
758    pub async fn alter_swap_rename(
759        &self,
760        object: alter_swap_rename_request::Object,
761    ) -> Result<WaitVersion> {
762        let request = AlterSwapRenameRequest {
763            object: Some(object),
764        };
765        let resp = self.inner.alter_swap_rename(request).await?;
766        Ok(resp
767            .version
768            .ok_or_else(|| anyhow!("wait version not set"))?)
769    }
770
771    pub async fn alter_secret(
772        &self,
773        secret_id: SecretId,
774        secret_name: String,
775        database_id: DatabaseId,
776        schema_id: SchemaId,
777        owner_id: UserId,
778        value: Vec<u8>,
779    ) -> Result<WaitVersion> {
780        let request = AlterSecretRequest {
781            secret_id,
782            name: secret_name,
783            database_id,
784            schema_id,
785            owner_id,
786            value,
787        };
788        let resp = self.inner.alter_secret(request).await?;
789        Ok(resp
790            .version
791            .ok_or_else(|| anyhow!("wait version not set"))?)
792    }
793
794    pub async fn replace_job(
795        &self,
796        graph: StreamFragmentGraph,
797        replace_job: ReplaceJob,
798    ) -> Result<WaitVersion> {
799        let request = ReplaceJobPlanRequest {
800            plan: Some(ReplaceJobPlan {
801                fragment_graph: Some(graph),
802                replace_job: Some(replace_job),
803            }),
804        };
805        let resp = self.inner.replace_job_plan(request).await?;
806        // TODO: handle error in `resp.status` here
807        Ok(resp
808            .version
809            .ok_or_else(|| anyhow!("wait version not set"))?)
810    }
811
812    pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
813        let request = AutoSchemaChangeRequest {
814            schema_change: Some(schema_change),
815        };
816        let _ = self.inner.auto_schema_change(request).await?;
817        Ok(())
818    }
819
820    pub async fn create_view(
821        &self,
822        view: PbView,
823        dependencies: HashSet<ObjectId>,
824    ) -> Result<WaitVersion> {
825        let request = CreateViewRequest {
826            view: Some(view),
827            dependencies: dependencies.into_iter().collect(),
828        };
829        let resp = self.inner.create_view(request).await?;
830        // TODO: handle error in `resp.status` here
831        Ok(resp
832            .version
833            .ok_or_else(|| anyhow!("wait version not set"))?)
834    }
835
836    pub async fn create_index(
837        &self,
838        index: PbIndex,
839        table: PbTable,
840        graph: StreamFragmentGraph,
841        if_not_exists: bool,
842    ) -> Result<WaitVersion> {
843        let request = CreateIndexRequest {
844            index: Some(index),
845            index_table: Some(table),
846            fragment_graph: Some(graph),
847            if_not_exists,
848        };
849        let resp = self.inner.create_index(request).await?;
850        // TODO: handle error in `resp.status` here
851        Ok(resp
852            .version
853            .ok_or_else(|| anyhow!("wait version not set"))?)
854    }
855
856    pub async fn drop_table(
857        &self,
858        source_id: Option<SourceId>,
859        table_id: TableId,
860        cascade: bool,
861    ) -> Result<WaitVersion> {
862        let request = DropTableRequest {
863            source_id: source_id.map(risingwave_pb::ddl_service::drop_table_request::SourceId::Id),
864            table_id,
865            cascade,
866        };
867
868        let resp = self.inner.drop_table(request).await?;
869        Ok(resp
870            .version
871            .ok_or_else(|| anyhow!("wait version not set"))?)
872    }
873
874    pub async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
875        let request = CompactIcebergTableRequest { sink_id };
876        let resp = self.inner.compact_iceberg_table(request).await?;
877        Ok(resp.task_id)
878    }
879
880    pub async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
881        let request = ExpireIcebergTableSnapshotsRequest { sink_id };
882        let _resp = self.inner.expire_iceberg_table_snapshots(request).await?;
883        Ok(())
884    }
885
886    pub async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<WaitVersion> {
887        let request = DropViewRequest { view_id, cascade };
888        let resp = self.inner.drop_view(request).await?;
889        Ok(resp
890            .version
891            .ok_or_else(|| anyhow!("wait version not set"))?)
892    }
893
894    pub async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<WaitVersion> {
895        let request = DropSourceRequest { source_id, cascade };
896        let resp = self.inner.drop_source(request).await?;
897        Ok(resp
898            .version
899            .ok_or_else(|| anyhow!("wait version not set"))?)
900    }
901
902    pub async fn reset_source(&self, source_id: SourceId) -> Result<WaitVersion> {
903        let request = ResetSourceRequest { source_id };
904        let resp = self.inner.reset_source(request).await?;
905        Ok(resp
906            .version
907            .ok_or_else(|| anyhow!("wait version not set"))?)
908    }
909
910    pub async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<WaitVersion> {
911        let request = DropSinkRequest { sink_id, cascade };
912        let resp = self.inner.drop_sink(request).await?;
913        Ok(resp
914            .version
915            .ok_or_else(|| anyhow!("wait version not set"))?)
916    }
917
918    pub async fn drop_subscription(
919        &self,
920        subscription_id: SubscriptionId,
921        cascade: bool,
922    ) -> Result<WaitVersion> {
923        let request = DropSubscriptionRequest {
924            subscription_id,
925            cascade,
926        };
927        let resp = self.inner.drop_subscription(request).await?;
928        Ok(resp
929            .version
930            .ok_or_else(|| anyhow!("wait version not set"))?)
931    }
932
933    pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
934        let request = DropIndexRequest { index_id, cascade };
935        let resp = self.inner.drop_index(request).await?;
936        Ok(resp
937            .version
938            .ok_or_else(|| anyhow!("wait version not set"))?)
939    }
940
941    pub async fn drop_function(
942        &self,
943        function_id: FunctionId,
944        cascade: bool,
945    ) -> Result<WaitVersion> {
946        let request = DropFunctionRequest {
947            function_id,
948            cascade,
949        };
950        let resp = self.inner.drop_function(request).await?;
951        Ok(resp
952            .version
953            .ok_or_else(|| anyhow!("wait version not set"))?)
954    }
955
956    pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
957        let request = DropDatabaseRequest { database_id };
958        let resp = self.inner.drop_database(request).await?;
959        Ok(resp
960            .version
961            .ok_or_else(|| anyhow!("wait version not set"))?)
962    }
963
964    pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
965        let request = DropSchemaRequest { schema_id, cascade };
966        let resp = self.inner.drop_schema(request).await?;
967        Ok(resp
968            .version
969            .ok_or_else(|| anyhow!("wait version not set"))?)
970    }
971
972    // TODO: using UserInfoVersion instead as return type.
973    pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
974        let request = CreateUserRequest { user: Some(user) };
975        let resp = self.inner.create_user(request).await?;
976        Ok(resp.version)
977    }
978
979    pub async fn drop_user(&self, user_id: UserId) -> Result<u64> {
980        let request = DropUserRequest { user_id };
981        let resp = self.inner.drop_user(request).await?;
982        Ok(resp.version)
983    }
984
985    pub async fn update_user(
986        &self,
987        user: UserInfo,
988        update_fields: Vec<UpdateField>,
989    ) -> Result<u64> {
990        let request = UpdateUserRequest {
991            user: Some(user),
992            update_fields: update_fields
993                .into_iter()
994                .map(|field| field as i32)
995                .collect::<Vec<_>>(),
996        };
997        let resp = self.inner.update_user(request).await?;
998        Ok(resp.version)
999    }
1000
1001    pub async fn grant_privilege(
1002        &self,
1003        user_ids: Vec<UserId>,
1004        privileges: Vec<GrantPrivilege>,
1005        with_grant_option: bool,
1006        granted_by: UserId,
1007    ) -> Result<u64> {
1008        let request = GrantPrivilegeRequest {
1009            user_ids,
1010            privileges,
1011            with_grant_option,
1012            granted_by,
1013        };
1014        let resp = self.inner.grant_privilege(request).await?;
1015        Ok(resp.version)
1016    }
1017
1018    pub async fn revoke_privilege(
1019        &self,
1020        user_ids: Vec<UserId>,
1021        privileges: Vec<GrantPrivilege>,
1022        granted_by: UserId,
1023        revoke_by: UserId,
1024        revoke_grant_option: bool,
1025        cascade: bool,
1026    ) -> Result<u64> {
1027        let request = RevokePrivilegeRequest {
1028            user_ids,
1029            privileges,
1030            granted_by,
1031            revoke_by,
1032            revoke_grant_option,
1033            cascade,
1034        };
1035        let resp = self.inner.revoke_privilege(request).await?;
1036        Ok(resp.version)
1037    }
1038
1039    pub async fn alter_default_privilege(
1040        &self,
1041        user_ids: Vec<UserId>,
1042        database_id: DatabaseId,
1043        schema_ids: Vec<SchemaId>,
1044        operation: AlterDefaultPrivilegeOperation,
1045        granted_by: UserId,
1046    ) -> Result<()> {
1047        let request = AlterDefaultPrivilegeRequest {
1048            user_ids,
1049            database_id,
1050            schema_ids,
1051            operation: Some(operation),
1052            granted_by,
1053        };
1054        self.inner.alter_default_privilege(request).await?;
1055        Ok(())
1056    }
1057
1058    /// Unregister the current node from the cluster.
1059    pub async fn unregister(&self) -> Result<()> {
1060        let request = DeleteWorkerNodeRequest {
1061            host: Some(self.host_addr.to_protobuf()),
1062        };
1063        self.inner.delete_worker_node(request).await?;
1064        self.shutting_down.store(true, Relaxed);
1065        Ok(())
1066    }
1067
1068    /// Try to unregister the current worker from the cluster with best effort. Log the result.
1069    pub async fn try_unregister(&self) {
1070        match self.unregister().await {
1071            Ok(_) => {
1072                tracing::info!(
1073                    worker_id = %self.worker_id(),
1074                    "successfully unregistered from meta service",
1075                )
1076            }
1077            Err(e) => {
1078                tracing::warn!(
1079                    error = %e.as_report(),
1080                    worker_id = %self.worker_id(),
1081                    "failed to unregister from meta service",
1082                );
1083            }
1084        }
1085    }
1086
1087    pub async fn list_worker_nodes(
1088        &self,
1089        worker_type: Option<WorkerType>,
1090    ) -> Result<Vec<WorkerNode>> {
1091        let request = ListAllNodesRequest {
1092            worker_type: worker_type.map(Into::into),
1093            include_starting_nodes: true,
1094        };
1095        let resp = self.inner.list_all_nodes(request).await?;
1096        Ok(resp.nodes)
1097    }
1098
1099    /// Starts a heartbeat worker.
1100    pub fn start_heartbeat_loop(
1101        meta_client: MetaClient,
1102        min_interval: Duration,
1103    ) -> (JoinHandle<()>, Sender<()>) {
1104        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1105        let join_handle = tokio::spawn(async move {
1106            let mut min_interval_ticker = tokio::time::interval(min_interval);
1107            loop {
1108                tokio::select! {
1109                    biased;
1110                    // Shutdown
1111                    _ = &mut shutdown_rx => {
1112                        tracing::info!("Heartbeat loop is stopped");
1113                        return;
1114                    }
1115                    // Wait for interval
1116                    _ = min_interval_ticker.tick() => {},
1117                }
1118                tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
1119                match tokio::time::timeout(
1120                    // TODO: decide better min_interval for timeout
1121                    min_interval * 3,
1122                    meta_client.send_heartbeat(),
1123                )
1124                .await
1125                {
1126                    Ok(Ok(_)) => {}
1127                    Ok(Err(err)) => {
1128                        tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
1129                    }
1130                    Err(_) => {
1131                        tracing::warn!("Failed to send_heartbeat: timeout");
1132                    }
1133                }
1134            }
1135        });
1136        (join_handle, shutdown_tx)
1137    }
1138
1139    pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
1140        let request = RisectlListStateTablesRequest {};
1141        let resp = self.inner.risectl_list_state_tables(request).await?;
1142        Ok(resp.tables)
1143    }
1144
1145    pub async fn risectl_resume_backfill(
1146        &self,
1147        request: RisectlResumeBackfillRequest,
1148    ) -> Result<()> {
1149        self.inner.risectl_resume_backfill(request).await?;
1150        Ok(())
1151    }
1152
1153    pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
1154        let request = FlushRequest { database_id };
1155        let resp = self.inner.flush(request).await?;
1156        Ok(resp.hummock_version_id)
1157    }
1158
1159    pub async fn wait(&self, job_id: Option<JobId>) -> Result<WaitVersion> {
1160        let request = WaitRequest { job_id };
1161        let resp = self.inner.wait(request).await?;
1162        Ok(resp
1163            .version
1164            .ok_or_else(|| anyhow!("wait version not set"))?)
1165    }
1166
1167    pub async fn recover(&self) -> Result<()> {
1168        let request = RecoverRequest {};
1169        self.inner.recover(request).await?;
1170        Ok(())
1171    }
1172
1173    pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
1174        let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
1175        let resp = self.inner.cancel_creating_jobs(request).await?;
1176        Ok(resp.canceled_jobs)
1177    }
1178
1179    pub async fn list_table_fragments(
1180        &self,
1181        job_ids: &[JobId],
1182    ) -> Result<HashMap<JobId, TableFragmentInfo>> {
1183        let request = ListTableFragmentsRequest {
1184            table_ids: job_ids.to_vec(),
1185        };
1186        let resp = self.inner.list_table_fragments(request).await?;
1187        Ok(resp.table_fragments)
1188    }
1189
1190    pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1191        let resp = self
1192            .inner
1193            .list_streaming_job_states(ListStreamingJobStatesRequest {})
1194            .await?;
1195        Ok(resp.states)
1196    }
1197
1198    pub async fn list_fragment_distributions(
1199        &self,
1200        include_node: bool,
1201    ) -> Result<Vec<FragmentDistribution>> {
1202        let resp = self
1203            .inner
1204            .list_fragment_distribution(ListFragmentDistributionRequest {
1205                include_node: Some(include_node),
1206            })
1207            .await?;
1208        Ok(resp.distributions)
1209    }
1210
1211    pub async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
1212        let resp = self
1213            .inner
1214            .list_creating_fragment_distribution(ListCreatingFragmentDistributionRequest {
1215                include_node: Some(true),
1216            })
1217            .await?;
1218        Ok(resp.distributions)
1219    }
1220
1221    pub async fn get_fragment_by_id(
1222        &self,
1223        fragment_id: FragmentId,
1224    ) -> Result<Option<FragmentDistribution>> {
1225        let resp = self
1226            .inner
1227            .get_fragment_by_id(GetFragmentByIdRequest { fragment_id })
1228            .await?;
1229        Ok(resp.distribution)
1230    }
1231
1232    pub async fn get_fragment_vnodes(
1233        &self,
1234        fragment_id: FragmentId,
1235    ) -> Result<Vec<(ActorId, Vec<u32>)>> {
1236        let resp = self
1237            .inner
1238            .get_fragment_vnodes(GetFragmentVnodesRequest { fragment_id })
1239            .await?;
1240        Ok(resp
1241            .actor_vnodes
1242            .into_iter()
1243            .map(|actor| (actor.actor_id, actor.vnode_indices))
1244            .collect())
1245    }
1246
1247    pub async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>> {
1248        let resp = self
1249            .inner
1250            .get_actor_vnodes(GetActorVnodesRequest { actor_id })
1251            .await?;
1252        Ok(resp.vnode_indices)
1253    }
1254
1255    pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1256        let resp = self
1257            .inner
1258            .list_actor_states(ListActorStatesRequest {})
1259            .await?;
1260        Ok(resp.states)
1261    }
1262
1263    pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1264        let resp = self
1265            .inner
1266            .list_actor_splits(ListActorSplitsRequest {})
1267            .await?;
1268
1269        Ok(resp.actor_splits)
1270    }
1271
1272    pub async fn pause(&self) -> Result<PauseResponse> {
1273        let request = PauseRequest {};
1274        let resp = self.inner.pause(request).await?;
1275        Ok(resp)
1276    }
1277
1278    pub async fn resume(&self) -> Result<ResumeResponse> {
1279        let request = ResumeRequest {};
1280        let resp = self.inner.resume(request).await?;
1281        Ok(resp)
1282    }
1283
1284    pub async fn apply_throttle(
1285        &self,
1286        throttle_target: PbThrottleTarget,
1287        throttle_type: risingwave_pb::common::PbThrottleType,
1288        id: u32,
1289        rate: Option<u32>,
1290    ) -> Result<ApplyThrottleResponse> {
1291        let request = ApplyThrottleRequest {
1292            throttle_target: throttle_target as i32,
1293            throttle_type: throttle_type as i32,
1294            id,
1295            rate,
1296        };
1297        let resp = self.inner.apply_throttle(request).await?;
1298        Ok(resp)
1299    }
1300
1301    pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1302        let resp = self
1303            .inner
1304            .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1305            .await?;
1306        Ok(resp.get_status().unwrap())
1307    }
1308
1309    pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1310        let request = GetClusterInfoRequest {};
1311        let resp = self.inner.get_cluster_info(request).await?;
1312        Ok(resp)
1313    }
1314
1315    pub async fn reschedule(
1316        &self,
1317        worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1318        revision: u64,
1319        resolve_no_shuffle_upstream: bool,
1320    ) -> Result<(bool, u64)> {
1321        let request = RescheduleRequest {
1322            revision,
1323            resolve_no_shuffle_upstream,
1324            worker_reschedules,
1325        };
1326        let resp = self.inner.reschedule(request).await?;
1327        Ok((resp.success, resp.revision))
1328    }
1329
1330    pub async fn risectl_get_pinned_versions_summary(
1331        &self,
1332    ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1333        let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1334        self.inner
1335            .rise_ctl_get_pinned_versions_summary(request)
1336            .await
1337    }
1338
1339    pub async fn risectl_get_checkpoint_hummock_version(
1340        &self,
1341    ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1342        let request = RiseCtlGetCheckpointVersionRequest {};
1343        self.inner.rise_ctl_get_checkpoint_version(request).await
1344    }
1345
1346    pub async fn risectl_pause_hummock_version_checkpoint(
1347        &self,
1348    ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1349        let request = RiseCtlPauseVersionCheckpointRequest {};
1350        self.inner.rise_ctl_pause_version_checkpoint(request).await
1351    }
1352
1353    pub async fn risectl_resume_hummock_version_checkpoint(
1354        &self,
1355    ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1356        let request = RiseCtlResumeVersionCheckpointRequest {};
1357        self.inner.rise_ctl_resume_version_checkpoint(request).await
1358    }
1359
1360    pub async fn init_metadata_for_replay(
1361        &self,
1362        tables: Vec<PbTable>,
1363        compaction_groups: Vec<CompactionGroupInfo>,
1364    ) -> Result<()> {
1365        let req = InitMetadataForReplayRequest {
1366            tables,
1367            compaction_groups,
1368        };
1369        let _resp = self.inner.init_metadata_for_replay(req).await?;
1370        Ok(())
1371    }
1372
1373    pub async fn replay_version_delta(
1374        &self,
1375        version_delta: HummockVersionDelta,
1376    ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1377        let req = ReplayVersionDeltaRequest {
1378            version_delta: Some(version_delta.into()),
1379        };
1380        let resp = self.inner.replay_version_delta(req).await?;
1381        Ok((
1382            HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1383            resp.modified_compaction_groups,
1384        ))
1385    }
1386
1387    pub async fn list_version_deltas(
1388        &self,
1389        start_id: HummockVersionId,
1390        num_limit: u32,
1391        committed_epoch_limit: HummockEpoch,
1392    ) -> Result<Vec<HummockVersionDelta>> {
1393        let req = ListVersionDeltasRequest {
1394            start_id,
1395            num_limit,
1396            committed_epoch_limit,
1397        };
1398        Ok(self
1399            .inner
1400            .list_version_deltas(req)
1401            .await?
1402            .version_deltas
1403            .unwrap()
1404            .version_deltas
1405            .iter()
1406            .map(HummockVersionDelta::from_rpc_protobuf)
1407            .collect())
1408    }
1409
1410    pub async fn trigger_compaction_deterministic(
1411        &self,
1412        version_id: HummockVersionId,
1413        compaction_groups: Vec<CompactionGroupId>,
1414    ) -> Result<()> {
1415        let req = TriggerCompactionDeterministicRequest {
1416            version_id,
1417            compaction_groups,
1418        };
1419        self.inner.trigger_compaction_deterministic(req).await?;
1420        Ok(())
1421    }
1422
1423    pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1424        let req = DisableCommitEpochRequest {};
1425        Ok(HummockVersion::from_rpc_protobuf(
1426            &self
1427                .inner
1428                .disable_commit_epoch(req)
1429                .await?
1430                .current_version
1431                .unwrap(),
1432        ))
1433    }
1434
1435    pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1436        let req = GetAssignedCompactTaskNumRequest {};
1437        let resp = self.inner.get_assigned_compact_task_num(req).await?;
1438        Ok(resp.num_tasks as usize)
1439    }
1440
1441    pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1442        let req = RiseCtlListCompactionGroupRequest {};
1443        let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1444        Ok(resp.compaction_groups)
1445    }
1446
1447    pub async fn risectl_update_compaction_config(
1448        &self,
1449        compaction_groups: &[CompactionGroupId],
1450        configs: &[MutableConfig],
1451    ) -> Result<()> {
1452        let req = RiseCtlUpdateCompactionConfigRequest {
1453            compaction_group_ids: compaction_groups.to_vec(),
1454            configs: configs
1455                .iter()
1456                .map(
1457                    |c| rise_ctl_update_compaction_config_request::MutableConfig {
1458                        mutable_config: Some(c.clone()),
1459                    },
1460                )
1461                .collect(),
1462        };
1463        let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1464        Ok(())
1465    }
1466
1467    pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1468        let req = BackupMetaRequest { remarks };
1469        let resp = self.inner.backup_meta(req).await?;
1470        Ok(resp.job_id)
1471    }
1472
1473    pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1474        let req = GetBackupJobStatusRequest { job_id };
1475        let resp = self.inner.get_backup_job_status(req).await?;
1476        Ok((resp.job_status(), resp.message))
1477    }
1478
1479    pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1480        let req = DeleteMetaSnapshotRequest {
1481            snapshot_ids: snapshot_ids.to_vec(),
1482        };
1483        let _resp = self.inner.delete_meta_snapshot(req).await?;
1484        Ok(())
1485    }
1486
1487    pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1488        let req = GetMetaSnapshotManifestRequest {};
1489        let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1490        Ok(resp.manifest.expect("should exist"))
1491    }
1492
1493    pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1494        let req = GetTelemetryInfoRequest {};
1495        let resp = self.inner.get_telemetry_info(req).await?;
1496        Ok(resp)
1497    }
1498
1499    pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1500        let req = GetMetaStoreInfoRequest {};
1501        let resp = self.inner.get_meta_store_info(req).await?;
1502        Ok(resp.meta_store_endpoint)
1503    }
1504
1505    pub async fn alter_sink_props(
1506        &self,
1507        sink_id: SinkId,
1508        changed_props: BTreeMap<String, String>,
1509        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1510        connector_conn_ref: Option<ConnectionId>,
1511    ) -> Result<()> {
1512        let req = AlterConnectorPropsRequest {
1513            object_id: sink_id.as_raw_id(),
1514            changed_props: changed_props.into_iter().collect(),
1515            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1516            connector_conn_ref,
1517            object_type: AlterConnectorPropsObject::Sink as i32,
1518            extra_options: None,
1519        };
1520        let _resp = self.inner.alter_connector_props(req).await?;
1521        Ok(())
1522    }
1523
1524    pub async fn alter_iceberg_table_props(
1525        &self,
1526        table_id: TableId,
1527        sink_id: SinkId,
1528        source_id: SourceId,
1529        changed_props: BTreeMap<String, String>,
1530        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1531        connector_conn_ref: Option<ConnectionId>,
1532    ) -> Result<()> {
1533        let req = AlterConnectorPropsRequest {
1534            object_id: table_id.as_raw_id(),
1535            changed_props: changed_props.into_iter().collect(),
1536            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1537            connector_conn_ref,
1538            object_type: AlterConnectorPropsObject::IcebergTable as i32,
1539            extra_options: Some(ExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds {
1540                sink_id,
1541                source_id,
1542            })),
1543        };
1544        let _resp = self.inner.alter_connector_props(req).await?;
1545        Ok(())
1546    }
1547
1548    pub async fn alter_source_connector_props(
1549        &self,
1550        source_id: SourceId,
1551        changed_props: BTreeMap<String, String>,
1552        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1553        connector_conn_ref: Option<ConnectionId>,
1554    ) -> Result<()> {
1555        let req = AlterConnectorPropsRequest {
1556            object_id: source_id.as_raw_id(),
1557            changed_props: changed_props.into_iter().collect(),
1558            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1559            connector_conn_ref,
1560            object_type: AlterConnectorPropsObject::Source as i32,
1561            extra_options: None,
1562        };
1563        let _resp = self.inner.alter_connector_props(req).await?;
1564        Ok(())
1565    }
1566
1567    pub async fn alter_connection_connector_props(
1568        &self,
1569        connection_id: u32,
1570        changed_props: BTreeMap<String, String>,
1571        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1572    ) -> Result<()> {
1573        let req = AlterConnectorPropsRequest {
1574            object_id: connection_id,
1575            changed_props: changed_props.into_iter().collect(),
1576            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1577            connector_conn_ref: None, // Connections don't reference other connections
1578            object_type: AlterConnectorPropsObject::Connection as i32,
1579            extra_options: None,
1580        };
1581        let _resp = self.inner.alter_connector_props(req).await?;
1582        Ok(())
1583    }
1584
1585    /// Orchestrated source property update with pause/update/resume workflow.
1586    /// This is the "safe" version that pauses sources before updating and resumes after.
1587    pub async fn alter_source_properties_safe(
1588        &self,
1589        source_id: SourceId,
1590        changed_props: BTreeMap<String, String>,
1591        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1592        reset_splits: bool,
1593    ) -> Result<()> {
1594        let req = AlterSourcePropertiesSafeRequest {
1595            source_id: source_id.as_raw_id(),
1596            changed_props: changed_props.into_iter().collect(),
1597            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1598            options: Some(PropertyUpdateOptions { reset_splits }),
1599        };
1600        let _resp = self.inner.alter_source_properties_safe(req).await?;
1601        Ok(())
1602    }
1603
1604    /// Reset source split assignments (UNSAFE - admin only).
1605    /// This clears persisted split metadata and triggers re-discovery.
1606    pub async fn reset_source_splits(&self, source_id: SourceId) -> Result<()> {
1607        let req = ResetSourceSplitsRequest {
1608            source_id: source_id.as_raw_id(),
1609        };
1610        let _resp = self.inner.reset_source_splits(req).await?;
1611        Ok(())
1612    }
1613
1614    /// Inject specific offsets into source splits (UNSAFE - admin only).
1615    /// This can cause data duplication or loss depending on the correctness of the provided offsets.
1616    pub async fn inject_source_offsets(
1617        &self,
1618        source_id: SourceId,
1619        split_offsets: HashMap<String, String>,
1620    ) -> Result<Vec<String>> {
1621        let req = InjectSourceOffsetsRequest {
1622            source_id: source_id.as_raw_id(),
1623            split_offsets,
1624        };
1625        let resp = self.inner.inject_source_offsets(req).await?;
1626        Ok(resp.applied_split_ids)
1627    }
1628
1629    pub async fn set_system_param(
1630        &self,
1631        param: String,
1632        value: Option<String>,
1633    ) -> Result<Option<SystemParamsReader>> {
1634        let req = SetSystemParamRequest { param, value };
1635        let resp = self.inner.set_system_param(req).await?;
1636        Ok(resp.params.map(SystemParamsReader::from))
1637    }
1638
1639    pub async fn get_session_params(&self) -> Result<String> {
1640        let req = GetSessionParamsRequest {};
1641        let resp = self.inner.get_session_params(req).await?;
1642        Ok(resp.params)
1643    }
1644
1645    pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1646        let req = SetSessionParamRequest { param, value };
1647        let resp = self.inner.set_session_param(req).await?;
1648        Ok(resp.param)
1649    }
1650
1651    pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1652        let req = GetDdlProgressRequest {};
1653        let resp = self.inner.get_ddl_progress(req).await?;
1654        Ok(resp.ddl_progress)
1655    }
1656
1657    pub async fn split_compaction_group(
1658        &self,
1659        group_id: CompactionGroupId,
1660        table_ids_to_new_group: &[TableId],
1661        partition_vnode_count: u32,
1662    ) -> Result<CompactionGroupId> {
1663        let req = SplitCompactionGroupRequest {
1664            group_id,
1665            table_ids: table_ids_to_new_group.to_vec(),
1666            partition_vnode_count,
1667        };
1668        let resp = self.inner.split_compaction_group(req).await?;
1669        Ok(resp.new_group_id)
1670    }
1671
1672    pub async fn get_tables(
1673        &self,
1674        table_ids: Vec<TableId>,
1675        include_dropped_tables: bool,
1676    ) -> Result<HashMap<TableId, Table>> {
1677        let req = GetTablesRequest {
1678            table_ids,
1679            include_dropped_tables,
1680        };
1681        let resp = self.inner.get_tables(req).await?;
1682        Ok(resp.tables)
1683    }
1684
1685    pub async fn list_serving_vnode_mappings(
1686        &self,
1687    ) -> Result<HashMap<FragmentId, (JobId, WorkerSlotMapping)>> {
1688        let req = GetServingVnodeMappingsRequest {};
1689        let resp = self.inner.get_serving_vnode_mappings(req).await?;
1690        let mappings = resp
1691            .worker_slot_mappings
1692            .into_iter()
1693            .map(|p| {
1694                (
1695                    p.fragment_id,
1696                    (
1697                        resp.fragment_to_table
1698                            .get(&p.fragment_id)
1699                            .cloned()
1700                            .unwrap_or_default(),
1701                        WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1702                    ),
1703                )
1704            })
1705            .collect();
1706        Ok(mappings)
1707    }
1708
1709    pub async fn risectl_list_compaction_status(
1710        &self,
1711    ) -> Result<(
1712        Vec<CompactStatus>,
1713        Vec<CompactTaskAssignment>,
1714        Vec<CompactTaskProgress>,
1715    )> {
1716        let req = RiseCtlListCompactionStatusRequest {};
1717        let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1718        Ok((
1719            resp.compaction_statuses,
1720            resp.task_assignment,
1721            resp.task_progress,
1722        ))
1723    }
1724
1725    pub async fn get_compaction_score(
1726        &self,
1727        compaction_group_id: CompactionGroupId,
1728    ) -> Result<Vec<PickerInfo>> {
1729        let req = GetCompactionScoreRequest {
1730            compaction_group_id,
1731        };
1732        let resp = self.inner.get_compaction_score(req).await?;
1733        Ok(resp.scores)
1734    }
1735
1736    pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1737        let req = RiseCtlRebuildTableStatsRequest {};
1738        let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1739        Ok(())
1740    }
1741
1742    pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1743        let req = ListBranchedObjectRequest {};
1744        let resp = self.inner.list_branched_object(req).await?;
1745        Ok(resp.branched_objects)
1746    }
1747
1748    pub async fn list_active_write_limit(&self) -> Result<HashMap<CompactionGroupId, WriteLimit>> {
1749        let req = ListActiveWriteLimitRequest {};
1750        let resp = self.inner.list_active_write_limit(req).await?;
1751        Ok(resp.write_limits)
1752    }
1753
1754    pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1755        let req = ListHummockMetaConfigRequest {};
1756        let resp = self.inner.list_hummock_meta_config(req).await?;
1757        Ok(resp.configs)
1758    }
1759
1760    pub async fn get_table_change_logs(
1761        &self,
1762        epoch_only: bool,
1763        start_epoch_inclusive: Option<u64>,
1764        end_epoch_inclusive: Option<u64>,
1765        table_ids: Option<HashSet<TableId>>,
1766        exclude_empty: bool,
1767        limit: Option<u32>,
1768    ) -> Result<TableChangeLogs> {
1769        let req = GetTableChangeLogsRequest {
1770            epoch_only,
1771            start_epoch_inclusive,
1772            end_epoch_inclusive,
1773            table_ids: table_ids.map(|iter| PbTableFilter {
1774                table_ids: iter.into_iter().collect::<Vec<_>>(),
1775            }),
1776            exclude_empty,
1777            limit,
1778        };
1779        let resp = self.inner.get_table_change_logs(req).await?;
1780        Ok(resp
1781            .table_change_logs
1782            .into_iter()
1783            .map(|(id, change_log)| (TableId::new(id), TableChangeLog::from_protobuf(&change_log)))
1784            .collect())
1785    }
1786
1787    pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1788        let _resp = self
1789            .inner
1790            .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1791            .await?;
1792
1793        Ok(())
1794    }
1795
1796    pub async fn rw_cloud_validate_source(
1797        &self,
1798        source_type: SourceType,
1799        source_config: HashMap<String, String>,
1800    ) -> Result<RwCloudValidateSourceResponse> {
1801        let req = RwCloudValidateSourceRequest {
1802            source_type: source_type.into(),
1803            source_config,
1804        };
1805        let resp = self.inner.rw_cloud_validate_source(req).await?;
1806        Ok(resp)
1807    }
1808
1809    pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1810        self.inner.core.read().await.sink_coordinate_client.clone()
1811    }
1812
1813    pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1814        let req = ListCompactTaskAssignmentRequest {};
1815        let resp = self.inner.list_compact_task_assignment(req).await?;
1816        Ok(resp.task_assignment)
1817    }
1818
1819    pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1820        let req = ListEventLogRequest::default();
1821        let resp = self.inner.list_event_log(req).await?;
1822        Ok(resp.event_logs)
1823    }
1824
1825    pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1826        let req = ListCompactTaskProgressRequest {};
1827        let resp = self.inner.list_compact_task_progress(req).await?;
1828        Ok(resp.task_progress)
1829    }
1830
1831    #[cfg(madsim)]
1832    pub fn try_add_panic_event_blocking(
1833        &self,
1834        panic_info: impl Display,
1835        timeout_millis: Option<u64>,
1836    ) {
1837    }
1838
1839    /// If `timeout_millis` is None, default is used.
1840    #[cfg(not(madsim))]
1841    pub fn try_add_panic_event_blocking(
1842        &self,
1843        panic_info: impl Display,
1844        timeout_millis: Option<u64>,
1845    ) {
1846        let event = event_log::EventWorkerNodePanic {
1847            worker_id: self.worker_id,
1848            worker_type: self.worker_type.into(),
1849            host_addr: Some(self.host_addr.to_protobuf()),
1850            panic_info: format!("{panic_info}"),
1851        };
1852        let grpc_meta_client = self.inner.clone();
1853        let _ = thread::spawn(move || {
1854            let rt = tokio::runtime::Builder::new_current_thread()
1855                .enable_all()
1856                .build()
1857                .unwrap();
1858            let req = AddEventLogRequest {
1859                event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1860            };
1861            rt.block_on(async {
1862                let _ = tokio::time::timeout(
1863                    Duration::from_millis(timeout_millis.unwrap_or(1000)),
1864                    grpc_meta_client.add_event_log(req),
1865                )
1866                .await;
1867            });
1868        })
1869        .join();
1870    }
1871
1872    pub async fn add_sink_fail_evet(
1873        &self,
1874        sink_id: SinkId,
1875        sink_name: String,
1876        connector: String,
1877        error: String,
1878    ) -> Result<()> {
1879        let event = event_log::EventSinkFail {
1880            sink_id,
1881            sink_name,
1882            connector,
1883            error,
1884        };
1885        let req = AddEventLogRequest {
1886            event: Some(add_event_log_request::Event::SinkFail(event)),
1887        };
1888        self.inner.add_event_log(req).await?;
1889        Ok(())
1890    }
1891
1892    pub async fn add_cdc_auto_schema_change_fail_event(
1893        &self,
1894        source_id: SourceId,
1895        table_name: String,
1896        cdc_table_id: String,
1897        upstream_ddl: String,
1898        fail_info: String,
1899    ) -> Result<()> {
1900        let event = event_log::EventAutoSchemaChangeFail {
1901            table_id: source_id.as_cdc_table_id(),
1902            table_name,
1903            cdc_table_id,
1904            upstream_ddl,
1905            fail_info,
1906        };
1907        let req = AddEventLogRequest {
1908            event: Some(add_event_log_request::Event::AutoSchemaChangeFail(event)),
1909        };
1910        self.inner.add_event_log(req).await?;
1911        Ok(())
1912    }
1913
1914    pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1915        let req = CancelCompactTaskRequest {
1916            task_id,
1917            task_status: task_status as _,
1918        };
1919        let resp = self.inner.cancel_compact_task(req).await?;
1920        Ok(resp.ret)
1921    }
1922
1923    pub async fn get_version_by_epoch(
1924        &self,
1925        epoch: HummockEpoch,
1926        table_id: TableId,
1927    ) -> Result<PbHummockVersion> {
1928        let req = GetVersionByEpochRequest { epoch, table_id };
1929        let resp = self.inner.get_version_by_epoch(req).await?;
1930        Ok(resp.version.unwrap())
1931    }
1932
1933    pub async fn get_cluster_limits(
1934        &self,
1935    ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1936        let req = GetClusterLimitsRequest {};
1937        let resp = self.inner.get_cluster_limits(req).await?;
1938        Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1939    }
1940
1941    pub async fn merge_compaction_group(
1942        &self,
1943        left_group_id: CompactionGroupId,
1944        right_group_id: CompactionGroupId,
1945    ) -> Result<()> {
1946        let req = MergeCompactionGroupRequest {
1947            left_group_id,
1948            right_group_id,
1949        };
1950        self.inner.merge_compaction_group(req).await?;
1951        Ok(())
1952    }
1953
1954    /// List all rate limits for sources and backfills
1955    pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1956        let request = ListRateLimitsRequest {};
1957        let resp = self.inner.list_rate_limits(request).await?;
1958        Ok(resp.rate_limits)
1959    }
1960
1961    pub async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
1962        let request = ListCdcProgressRequest {};
1963        let resp = self.inner.list_cdc_progress(request).await?;
1964        Ok(resp.cdc_progress)
1965    }
1966
1967    pub async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>> {
1968        let request = ListRefreshTableStatesRequest {};
1969        let resp = self.inner.list_refresh_table_states(request).await?;
1970        Ok(resp.states)
1971    }
1972
1973    pub async fn list_iceberg_compaction_status(
1974        &self,
1975    ) -> Result<Vec<list_iceberg_compaction_status_response::IcebergCompactionStatus>> {
1976        let request = ListIcebergCompactionStatusRequest {};
1977        let resp = self.inner.list_iceberg_compaction_status(request).await?;
1978        Ok(resp.statuses)
1979    }
1980
1981    pub async fn list_sink_log_store_tables(
1982        &self,
1983    ) -> Result<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
1984        let request = ListSinkLogStoreTablesRequest {};
1985        let resp = self.inner.list_sink_log_store_tables(request).await?;
1986        Ok(resp.tables)
1987    }
1988
1989    pub async fn create_iceberg_table(
1990        &self,
1991        table_job_info: PbTableJobInfo,
1992        sink_job_info: PbSinkJobInfo,
1993        iceberg_source: PbSource,
1994        if_not_exists: bool,
1995    ) -> Result<WaitVersion> {
1996        let request = CreateIcebergTableRequest {
1997            table_info: Some(table_job_info),
1998            sink_info: Some(sink_job_info),
1999            iceberg_source: Some(iceberg_source),
2000            if_not_exists,
2001        };
2002
2003        let resp = Box::pin(self.inner.create_iceberg_table(request)).await?;
2004        Ok(resp
2005            .version
2006            .ok_or_else(|| anyhow!("wait version not set"))?)
2007    }
2008
2009    pub async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
2010        let request = ListIcebergTablesRequest {};
2011        let resp = self.inner.list_iceberg_tables(request).await?;
2012        Ok(resp.iceberg_tables)
2013    }
2014
2015    /// Get the await tree of all nodes in the cluster.
2016    pub async fn get_cluster_stack_trace(
2017        &self,
2018        actor_traces_format: ActorTracesFormat,
2019    ) -> Result<StackTraceResponse> {
2020        let request = StackTraceRequest {
2021            actor_traces_format: actor_traces_format as i32,
2022        };
2023        let resp = self.inner.stack_trace(request).await?;
2024        Ok(resp)
2025    }
2026
2027    pub async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
2028        let request = SetSyncLogStoreAlignedRequest { job_id, aligned };
2029        self.inner.set_sync_log_store_aligned(request).await?;
2030        Ok(())
2031    }
2032
2033    pub async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
2034        self.inner.refresh(request).await
2035    }
2036
2037    pub async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
2038        let request = ListUnmigratedTablesRequest {};
2039        let resp = self.inner.list_unmigrated_tables(request).await?;
2040        Ok(resp
2041            .tables
2042            .into_iter()
2043            .map(|table| (table.table_id, table.table_name))
2044            .collect())
2045    }
2046}
2047
2048#[async_trait]
2049impl HummockMetaClient for MetaClient {
2050    async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
2051        let req = UnpinVersionBeforeRequest {
2052            context_id: self.worker_id(),
2053            unpin_version_before,
2054        };
2055        self.inner.unpin_version_before(req).await?;
2056        Ok(())
2057    }
2058
2059    async fn get_current_version(&self) -> Result<HummockVersion> {
2060        let req = GetCurrentVersionRequest::default();
2061        Ok(HummockVersion::from_rpc_protobuf(
2062            &self
2063                .inner
2064                .get_current_version(req)
2065                .await?
2066                .current_version
2067                .unwrap(),
2068        ))
2069    }
2070
2071    async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
2072        let resp = self
2073            .inner
2074            .get_new_object_ids(GetNewObjectIdsRequest { number })
2075            .await?;
2076        Ok(ObjectIdRange::new(resp.start_id, resp.end_id))
2077    }
2078
2079    async fn commit_epoch_with_change_log(
2080        &self,
2081        _epoch: HummockEpoch,
2082        _sync_result: SyncResult,
2083        _change_log_info: Option<HummockMetaClientChangeLogInfo>,
2084    ) -> Result<()> {
2085        panic!("Only meta service can commit_epoch in production.")
2086    }
2087
2088    async fn trigger_manual_compaction(
2089        &self,
2090        compaction_group_id: CompactionGroupId,
2091        table_id: JobId,
2092        level: u32,
2093        target_level: Option<u32>,
2094        sst_ids: Vec<HummockSstableId>,
2095        exclusive: bool,
2096    ) -> Result<bool> {
2097        // TODO: support key_range parameter
2098        let req = TriggerManualCompactionRequest {
2099            compaction_group_id,
2100            table_id,
2101            // if table_id not exist, manual_compaction will include all the sst
2102            // without check internal_table_id
2103            level,
2104            target_level,
2105            sst_ids,
2106            exclusive: Some(exclusive),
2107            ..Default::default()
2108        };
2109
2110        let resp = self.inner.trigger_manual_compaction(req).await?;
2111        Ok(resp.should_retry.unwrap_or(false))
2112    }
2113
2114    async fn trigger_full_gc(
2115        &self,
2116        sst_retention_time_sec: u64,
2117        prefix: Option<String>,
2118    ) -> Result<()> {
2119        self.inner
2120            .trigger_full_gc(TriggerFullGcRequest {
2121                sst_retention_time_sec,
2122                prefix,
2123            })
2124            .await?;
2125        Ok(())
2126    }
2127
2128    async fn subscribe_compaction_event(
2129        &self,
2130    ) -> Result<(
2131        UnboundedSender<SubscribeCompactionEventRequest>,
2132        BoxStream<'static, CompactionEventItem>,
2133    )> {
2134        let (request_sender, request_receiver) =
2135            unbounded_channel::<SubscribeCompactionEventRequest>();
2136        request_sender
2137            .send(SubscribeCompactionEventRequest {
2138                event: Some(subscribe_compaction_event_request::Event::Register(
2139                    Register {
2140                        context_id: self.worker_id(),
2141                    },
2142                )),
2143                create_at: SystemTime::now()
2144                    .duration_since(SystemTime::UNIX_EPOCH)
2145                    .expect("Clock may have gone backwards")
2146                    .as_millis() as u64,
2147            })
2148            .context("Failed to subscribe compaction event")?;
2149
2150        let stream = self
2151            .inner
2152            .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
2153                request_receiver,
2154            )))
2155            .await?;
2156
2157        Ok((request_sender, Box::pin(stream)))
2158    }
2159
2160    async fn get_version_by_epoch(
2161        &self,
2162        epoch: HummockEpoch,
2163        table_id: TableId,
2164    ) -> Result<PbHummockVersion> {
2165        self.get_version_by_epoch(epoch, table_id).await
2166    }
2167
2168    async fn subscribe_iceberg_compaction_event(
2169        &self,
2170    ) -> Result<(
2171        UnboundedSender<SubscribeIcebergCompactionEventRequest>,
2172        BoxStream<'static, IcebergCompactionEventItem>,
2173    )> {
2174        let (request_sender, request_receiver) =
2175            unbounded_channel::<SubscribeIcebergCompactionEventRequest>();
2176        request_sender
2177            .send(SubscribeIcebergCompactionEventRequest {
2178                event: Some(subscribe_iceberg_compaction_event_request::Event::Register(
2179                    IcebergRegister {
2180                        context_id: self.worker_id(),
2181                    },
2182                )),
2183                create_at: SystemTime::now()
2184                    .duration_since(SystemTime::UNIX_EPOCH)
2185                    .expect("Clock may have gone backwards")
2186                    .as_millis() as u64,
2187            })
2188            .context("Failed to subscribe compaction event")?;
2189
2190        let stream = self
2191            .inner
2192            .subscribe_iceberg_compaction_event(Request::new(UnboundedReceiverStream::new(
2193                request_receiver,
2194            )))
2195            .await?;
2196
2197        Ok((request_sender, Box::pin(stream)))
2198    }
2199
2200    async fn get_table_change_logs(
2201        &self,
2202        epoch_only: bool,
2203        start_epoch_inclusive: Option<u64>,
2204        end_epoch_inclusive: Option<u64>,
2205        table_ids: Option<HashSet<TableId>>,
2206        exclude_empty: bool,
2207        limit: Option<u32>,
2208    ) -> Result<TableChangeLogs> {
2209        self.get_table_change_logs(
2210            epoch_only,
2211            start_epoch_inclusive,
2212            end_epoch_inclusive,
2213            table_ids,
2214            exclude_empty,
2215            limit,
2216        )
2217        .await
2218    }
2219}
2220
2221#[async_trait]
2222impl TelemetryInfoFetcher for MetaClient {
2223    async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
2224        let resp = self
2225            .get_telemetry_info()
2226            .await
2227            .map_err(|e| e.to_report_string())?;
2228        let tracking_id = resp.get_tracking_id().ok();
2229        Ok(tracking_id.map(|id| id.to_owned()))
2230    }
2231}
2232
2233pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
2234
2235#[derive(Debug, Clone)]
2236struct GrpcMetaClientCore {
2237    cluster_client: ClusterServiceClient<Channel>,
2238    meta_member_client: MetaMemberServiceClient<Channel>,
2239    heartbeat_client: HeartbeatServiceClient<Channel>,
2240    ddl_client: DdlServiceClient<Channel>,
2241    hummock_client: HummockManagerServiceClient<Channel>,
2242    notification_client: NotificationServiceClient<Channel>,
2243    stream_client: StreamManagerServiceClient<Channel>,
2244    user_client: UserServiceClient<Channel>,
2245    scale_client: ScaleServiceClient<Channel>,
2246    backup_client: BackupServiceClient<Channel>,
2247    telemetry_client: TelemetryInfoServiceClient<Channel>,
2248    system_params_client: SystemParamsServiceClient<Channel>,
2249    session_params_client: SessionParamServiceClient<Channel>,
2250    serving_client: ServingServiceClient<Channel>,
2251    cloud_client: CloudServiceClient<Channel>,
2252    sink_coordinate_client: SinkCoordinationRpcClient,
2253    event_log_client: EventLogServiceClient<Channel>,
2254    cluster_limit_client: ClusterLimitServiceClient<Channel>,
2255    hosted_iceberg_catalog_service_client: HostedIcebergCatalogServiceClient<Channel>,
2256    monitor_client: MonitorServiceClient<Channel>,
2257}
2258
2259impl GrpcMetaClientCore {
2260    pub(crate) fn new(channel: Channel) -> Self {
2261        let cluster_client = ClusterServiceClient::new(channel.clone());
2262        let meta_member_client = MetaMemberClient::new(channel.clone());
2263        let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
2264        let ddl_client =
2265            DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2266        let hummock_client =
2267            HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2268        let notification_client =
2269            NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2270        let stream_client =
2271            StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2272        let user_client = UserServiceClient::new(channel.clone());
2273        let scale_client =
2274            ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2275        let backup_client = BackupServiceClient::new(channel.clone());
2276        let telemetry_client =
2277            TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2278        let system_params_client = SystemParamsServiceClient::new(channel.clone());
2279        let session_params_client = SessionParamServiceClient::new(channel.clone());
2280        let serving_client = ServingServiceClient::new(channel.clone());
2281        let cloud_client = CloudServiceClient::new(channel.clone());
2282        let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone())
2283            .max_decoding_message_size(usize::MAX);
2284        let event_log_client = EventLogServiceClient::new(channel.clone());
2285        let cluster_limit_client = ClusterLimitServiceClient::new(channel.clone());
2286        let hosted_iceberg_catalog_service_client =
2287            HostedIcebergCatalogServiceClient::new(channel.clone());
2288        let monitor_client = configured_monitor_service_client(MonitorServiceClient::new(channel));
2289
2290        GrpcMetaClientCore {
2291            cluster_client,
2292            meta_member_client,
2293            heartbeat_client,
2294            ddl_client,
2295            hummock_client,
2296            notification_client,
2297            stream_client,
2298            user_client,
2299            scale_client,
2300            backup_client,
2301            telemetry_client,
2302            system_params_client,
2303            session_params_client,
2304            serving_client,
2305            cloud_client,
2306            sink_coordinate_client,
2307            event_log_client,
2308            cluster_limit_client,
2309            hosted_iceberg_catalog_service_client,
2310            monitor_client,
2311        }
2312    }
2313}
2314
2315/// Client to meta server. Cloning the instance is lightweight.
2316///
2317/// It is a wrapper of tonic client. See [`crate::meta_rpc_client_method_impl`].
2318#[derive(Debug, Clone)]
2319struct GrpcMetaClient {
2320    member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
2321    core: Arc<RwLock<GrpcMetaClientCore>>,
2322}
2323
2324type MetaMemberClient = MetaMemberServiceClient<Channel>;
2325
2326struct MetaMemberGroup {
2327    members: LruCache<http::Uri, Option<MetaMemberClient>>,
2328}
2329
2330struct MetaMemberManagement {
2331    core_ref: Arc<RwLock<GrpcMetaClientCore>>,
2332    members: Either<MetaMemberClient, MetaMemberGroup>,
2333    current_leader: http::Uri,
2334    meta_config: Arc<MetaConfig>,
2335}
2336
2337impl MetaMemberManagement {
2338    const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
2339
2340    fn host_address_to_uri(addr: HostAddress) -> http::Uri {
2341        format!("http://{}:{}", addr.host, addr.port)
2342            .parse()
2343            .unwrap()
2344    }
2345
2346    async fn recreate_core(&self, channel: Channel) {
2347        let mut core = self.core_ref.write().await;
2348        *core = GrpcMetaClientCore::new(channel);
2349    }
2350
2351    async fn refresh_members(&mut self) -> Result<()> {
2352        let leader_addr = match self.members.as_mut() {
2353            Either::Left(client) => {
2354                let resp = client
2355                    .to_owned()
2356                    .members(MembersRequest {})
2357                    .await
2358                    .map_err(RpcError::from_meta_status)?;
2359                let resp = resp.into_inner();
2360                resp.members.into_iter().find(|member| member.is_leader)
2361            }
2362            Either::Right(member_group) => {
2363                let mut fetched_members = None;
2364
2365                for (addr, client) in &mut member_group.members {
2366                    let members: Result<_> = try {
2367                        let mut client = match client {
2368                            Some(cached_client) => cached_client.to_owned(),
2369                            None => {
2370                                let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
2371                                let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
2372                                    .await
2373                                    .context("failed to create client")?;
2374                                let new_client: MetaMemberClient =
2375                                    MetaMemberServiceClient::new(channel);
2376                                *client = Some(new_client.clone());
2377
2378                                new_client
2379                            }
2380                        };
2381
2382                        let resp = client
2383                            .members(MembersRequest {})
2384                            .await
2385                            .context("failed to fetch members")?;
2386
2387                        resp.into_inner().members
2388                    };
2389
2390                    let fetched = members.is_ok();
2391                    fetched_members = Some(members);
2392                    if fetched {
2393                        break;
2394                    }
2395                }
2396
2397                let members = fetched_members
2398                    .context("no member available in the list")?
2399                    .context("could not refresh members")?;
2400
2401                // find new leader
2402                let mut leader = None;
2403                for member in members {
2404                    if member.is_leader {
2405                        leader = Some(member.clone());
2406                    }
2407
2408                    let addr = Self::host_address_to_uri(member.address.unwrap());
2409                    // We don't clean any expired addrs here to deal with some extreme situations.
2410                    if !member_group.members.contains(&addr) {
2411                        tracing::info!("new meta member joined: {}", addr);
2412                        member_group.members.put(addr, None);
2413                    }
2414                }
2415
2416                leader
2417            }
2418        };
2419
2420        if let Some(leader) = leader_addr {
2421            let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
2422
2423            if discovered_leader != self.current_leader {
2424                tracing::info!("new meta leader {} discovered", discovered_leader);
2425
2426                let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
2427                    Duration::from_secs(self.meta_config.meta_leader_lease_secs),
2428                    false,
2429                );
2430
2431                let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
2432                    let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
2433                    GrpcMetaClient::connect_to_endpoint(endpoint).await
2434                })
2435                .await?;
2436
2437                self.recreate_core(channel).await;
2438                self.current_leader = discovered_leader;
2439            }
2440        }
2441
2442        Ok(())
2443    }
2444}
2445
2446impl GrpcMetaClient {
2447    // See `Endpoint::http2_keep_alive_interval`
2448    const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
2449    // See `Endpoint::keep_alive_timeout`
2450    const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
2451    // Retry base interval in ms for connecting to meta server.
2452    const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
2453    // Max retry times for connecting to meta server.
2454    const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
2455
2456    fn start_meta_member_monitor(
2457        &self,
2458        init_leader_addr: http::Uri,
2459        members: Either<MetaMemberClient, MetaMemberGroup>,
2460        force_refresh_receiver: Receiver<Sender<Result<()>>>,
2461        meta_config: Arc<MetaConfig>,
2462    ) -> Result<()> {
2463        let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
2464        let current_leader = init_leader_addr;
2465
2466        let enable_period_tick = matches!(members, Either::Right(_));
2467
2468        let member_management = MetaMemberManagement {
2469            core_ref,
2470            members,
2471            current_leader,
2472            meta_config,
2473        };
2474
2475        let mut force_refresh_receiver = force_refresh_receiver;
2476
2477        tokio::spawn(async move {
2478            let mut member_management = member_management;
2479            let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
2480
2481            loop {
2482                let event: Option<Sender<Result<()>>> = if enable_period_tick {
2483                    tokio::select! {
2484                        _ = ticker.tick() => None,
2485                        result_sender = force_refresh_receiver.recv() => {
2486                            if result_sender.is_none() {
2487                                break;
2488                            }
2489
2490                            result_sender
2491                        },
2492                    }
2493                } else {
2494                    let result_sender = force_refresh_receiver.recv().await;
2495
2496                    if result_sender.is_none() {
2497                        break;
2498                    }
2499
2500                    result_sender
2501                };
2502
2503                let tick_result = member_management.refresh_members().await;
2504                if let Err(e) = tick_result.as_ref() {
2505                    tracing::warn!(error = %e.as_report(),  "refresh meta member client failed");
2506                }
2507
2508                if let Some(sender) = event {
2509                    // ignore resp
2510                    let _resp = sender.send(tick_result);
2511                }
2512            }
2513        });
2514
2515        Ok(())
2516    }
2517
2518    async fn force_refresh_leader(&self) -> Result<()> {
2519        let (sender, receiver) = oneshot::channel();
2520
2521        self.member_monitor_event_sender
2522            .send(sender)
2523            .await
2524            .map_err(|e| anyhow!(e))?;
2525
2526        receiver.await.map_err(|e| anyhow!(e))?
2527    }
2528
2529    /// Connect to the meta server from `addrs`.
2530    pub async fn new(strategy: &MetaAddressStrategy, config: Arc<MetaConfig>) -> Result<Self> {
2531        let (channel, addr) = match strategy {
2532            MetaAddressStrategy::LoadBalance(addr) => {
2533                Self::try_build_rpc_channel(vec![addr.clone()]).await
2534            }
2535            MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
2536        }?;
2537        let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
2538        let client = GrpcMetaClient {
2539            member_monitor_event_sender: force_refresh_sender,
2540            core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
2541        };
2542
2543        let meta_member_client = client.core.read().await.meta_member_client.clone();
2544        let members = match strategy {
2545            MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
2546            MetaAddressStrategy::List(addrs) => {
2547                let mut members = LruCache::new(20.try_into().unwrap());
2548                for addr in addrs {
2549                    members.put(addr.clone(), None);
2550                }
2551                members.put(addr.clone(), Some(meta_member_client));
2552
2553                Either::Right(MetaMemberGroup { members })
2554            }
2555        };
2556
2557        client.start_meta_member_monitor(addr, members, force_refresh_receiver, config.clone())?;
2558
2559        client.force_refresh_leader().await?;
2560
2561        Ok(client)
2562    }
2563
2564    fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2565        Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2566    }
2567
2568    pub(crate) async fn try_build_rpc_channel(
2569        addrs: impl IntoIterator<Item = http::Uri>,
2570    ) -> Result<(Channel, http::Uri)> {
2571        let endpoints: Vec<_> = addrs
2572            .into_iter()
2573            .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2574            .collect();
2575
2576        let mut last_error = None;
2577
2578        for (endpoint, addr) in endpoints {
2579            match Self::connect_to_endpoint(endpoint).await {
2580                Ok(channel) => {
2581                    tracing::info!("Connect to meta server {} successfully", addr);
2582                    return Ok((channel, addr));
2583                }
2584                Err(e) => {
2585                    tracing::warn!(
2586                        error = %e.as_report(),
2587                        "Failed to connect to meta server {}, trying again",
2588                        addr,
2589                    );
2590                    last_error = Some(e);
2591                }
2592            }
2593        }
2594
2595        if let Some(last_error) = last_error {
2596            Err(anyhow::anyhow!(last_error)
2597                .context("failed to connect to all meta servers")
2598                .into())
2599        } else {
2600            bail!("no meta server address provided")
2601        }
2602    }
2603
2604    async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2605        let channel = endpoint
2606            .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2607            .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2608            .connect_timeout(Duration::from_secs(5))
2609            .monitored_connect("grpc-meta-client", Default::default())
2610            .await?
2611            .wrapped();
2612
2613        Ok(channel)
2614    }
2615
2616    pub(crate) fn retry_strategy_to_bound(
2617        high_bound: Duration,
2618        exceed: bool,
2619    ) -> impl Iterator<Item = Duration> {
2620        let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2621            .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2622            .map(jitter);
2623
2624        let mut sum = Duration::default();
2625
2626        iter.take_while(move |duration| {
2627            sum += *duration;
2628
2629            if exceed {
2630                sum < high_bound + *duration
2631            } else {
2632                sum < high_bound
2633            }
2634        })
2635    }
2636}
2637
2638macro_rules! for_all_meta_rpc {
2639    ($macro:ident) => {
2640        $macro! {
2641             { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2642            ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2643            ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2644            ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2645            ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2646            ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2647            ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2648            ,{ stream_client, flush, FlushRequest, FlushResponse }
2649            ,{ stream_client, pause, PauseRequest, PauseResponse }
2650            ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2651            ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2652            ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2653            ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2654            ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2655            ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2656            ,{ stream_client, list_creating_fragment_distribution, ListCreatingFragmentDistributionRequest, ListCreatingFragmentDistributionResponse }
2657            ,{ stream_client, list_sink_log_store_tables, ListSinkLogStoreTablesRequest, ListSinkLogStoreTablesResponse }
2658            ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2659            ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2660            ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2661            ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2662            ,{ stream_client, list_cdc_progress, ListCdcProgressRequest, ListCdcProgressResponse }
2663            ,{ stream_client, list_refresh_table_states, ListRefreshTableStatesRequest, ListRefreshTableStatesResponse }
2664            ,{ stream_client, list_iceberg_compaction_status, ListIcebergCompactionStatusRequest, ListIcebergCompactionStatusResponse }
2665            ,{ stream_client, alter_connector_props, AlterConnectorPropsRequest, AlterConnectorPropsResponse }
2666            ,{ stream_client, alter_source_properties_safe, AlterSourcePropertiesSafeRequest, AlterSourcePropertiesSafeResponse }
2667            ,{ stream_client, reset_source_splits, ResetSourceSplitsRequest, ResetSourceSplitsResponse }
2668            ,{ stream_client, inject_source_offsets, InjectSourceOffsetsRequest, InjectSourceOffsetsResponse }
2669            ,{ stream_client, get_fragment_by_id, GetFragmentByIdRequest, GetFragmentByIdResponse }
2670            ,{ stream_client, get_fragment_vnodes, GetFragmentVnodesRequest, GetFragmentVnodesResponse }
2671            ,{ stream_client, get_actor_vnodes, GetActorVnodesRequest, GetActorVnodesResponse }
2672            ,{ stream_client, set_sync_log_store_aligned, SetSyncLogStoreAlignedRequest, SetSyncLogStoreAlignedResponse }
2673            ,{ stream_client, refresh, RefreshRequest, RefreshResponse }
2674            ,{ stream_client, list_unmigrated_tables, ListUnmigratedTablesRequest, ListUnmigratedTablesResponse }
2675            ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2676            ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2677            ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2678            ,{ ddl_client, alter_subscription_retention, AlterSubscriptionRetentionRequest, AlterSubscriptionRetentionResponse }
2679            ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2680            ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2681            ,{ ddl_client, alter_backfill_parallelism, AlterBackfillParallelismRequest, AlterBackfillParallelismResponse }
2682            ,{ ddl_client, alter_streaming_job_config, AlterStreamingJobConfigRequest, AlterStreamingJobConfigResponse }
2683            ,{ ddl_client, alter_fragment_parallelism, AlterFragmentParallelismRequest, AlterFragmentParallelismResponse }
2684            ,{ ddl_client, alter_cdc_table_backfill_parallelism, AlterCdcTableBackfillParallelismRequest, AlterCdcTableBackfillParallelismResponse }
2685            ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2686            ,{ ddl_client, alter_database_param, AlterDatabaseParamRequest, AlterDatabaseParamResponse }
2687            ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2688            ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2689            ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2690            ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2691            ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2692            ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2693            ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2694            ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2695            ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2696            ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2697            ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2698            ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2699            ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2700            ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2701            ,{ ddl_client, reset_source, ResetSourceRequest, ResetSourceResponse }
2702            ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2703            ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2704            ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2705            ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2706            ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2707            ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2708            ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2709            ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2710            ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2711            ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2712            ,{ ddl_client, risectl_resume_backfill, RisectlResumeBackfillRequest, RisectlResumeBackfillResponse }
2713            ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2714            ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2715            ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2716            ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2717            ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2718            ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2719            ,{ ddl_client, wait, WaitRequest, WaitResponse }
2720            ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2721            ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2722            ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2723            ,{ ddl_client, compact_iceberg_table, CompactIcebergTableRequest, CompactIcebergTableResponse }
2724            ,{ ddl_client, expire_iceberg_table_snapshots, ExpireIcebergTableSnapshotsRequest, ExpireIcebergTableSnapshotsResponse }
2725            ,{ ddl_client, create_iceberg_table, CreateIcebergTableRequest, CreateIcebergTableResponse }
2726            ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2727            ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2728            ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2729            ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2730            ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2731            ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2732            ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2733            ,{ hummock_client, get_new_object_ids, GetNewObjectIdsRequest, GetNewObjectIdsResponse }
2734            ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2735            ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2736            ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2737            ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2738            ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2739            ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2740            ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2741            ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2742            ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2743            ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2744            ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2745            ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2746            ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2747            ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2748            ,{ hummock_client, subscribe_iceberg_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeIcebergCompactionEventRequest>, Streaming<SubscribeIcebergCompactionEventResponse> }
2749            ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2750            ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2751            ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2752            ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2753            ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2754            ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2755            ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2756            ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2757            ,{ hummock_client, get_table_change_logs, GetTableChangeLogsRequest, GetTableChangeLogsResponse }
2758            ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2759            ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2760            ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2761            ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2762            ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2763            ,{ user_client, alter_default_privilege, AlterDefaultPrivilegeRequest, AlterDefaultPrivilegeResponse }
2764            ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2765            ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2766            ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2767            ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2768            ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2769            ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2770            ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2771            ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2772            ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2773            ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2774            ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2775            ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2776            ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2777            ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2778            ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2779            ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2780            ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2781            ,{ hosted_iceberg_catalog_service_client, list_iceberg_tables, ListIcebergTablesRequest, ListIcebergTablesResponse }
2782            ,{ monitor_client, stack_trace, StackTraceRequest, StackTraceResponse }
2783        }
2784    };
2785}
2786
2787impl GrpcMetaClient {
2788    async fn refresh_client_if_needed(&self, code: Code) {
2789        if matches!(
2790            code,
2791            Code::Unknown | Code::Unimplemented | Code::Unavailable
2792        ) {
2793            tracing::debug!("matching tonic code {}", code);
2794            let (result_sender, result_receiver) = oneshot::channel();
2795            if self
2796                .member_monitor_event_sender
2797                .try_send(result_sender)
2798                .is_ok()
2799            {
2800                if let Ok(Err(e)) = result_receiver.await {
2801                    tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2802                }
2803            } else {
2804                tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2805            }
2806        }
2807    }
2808}
2809
2810impl GrpcMetaClient {
2811    for_all_meta_rpc! { meta_rpc_client_method_impl }
2812}