Skip to main content

risingwave_rpc_client/
meta_client.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{
33    AlterDatabaseParam, FunctionId, IndexId, ObjectId, SecretId, TableId,
34};
35use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
36use risingwave_common::hash::WorkerSlotMapping;
37use risingwave_common::id::{
38    ConnectionId, DatabaseId, JobId, SchemaId, SinkId, SubscriptionId, UserId, ViewId, WorkerId,
39};
40use risingwave_common::monitor::EndpointExt;
41use risingwave_common::system_param::AdaptiveParallelismStrategy;
42use risingwave_common::system_param::reader::SystemParamsReader;
43use risingwave_common::telemetry::report::TelemetryInfoFetcher;
44use risingwave_common::util::addr::HostAddr;
45use risingwave_common::util::meta_addr::MetaAddressStrategy;
46use risingwave_common::util::resource_util::cpu::total_cpu_available;
47use risingwave_common::util::resource_util::hostname;
48use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
49use risingwave_error::bail;
50use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
51use risingwave_hummock_sdk::change_log::{TableChangeLog, TableChangeLogs};
52use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
53use risingwave_hummock_sdk::{
54    CompactionGroupId, HummockEpoch, HummockVersionId, ObjectIdRange, SyncResult,
55};
56use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
57use risingwave_pb::backup_service::*;
58use risingwave_pb::catalog::{
59    Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
60    PbSubscription, PbTable, PbView, Table,
61};
62use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
63use risingwave_pb::cloud_service::*;
64use risingwave_pb::common::worker_node::Property;
65use risingwave_pb::common::{HostAddress, OptionalUint32, OptionalUint64, WorkerNode, WorkerType};
66use risingwave_pb::configured_monitor_service_client;
67use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
68use risingwave_pb::ddl_service::alter_owner_request::Object;
69use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
70use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
71use risingwave_pb::ddl_service::*;
72use risingwave_pb::hummock::compact_task::TaskStatus;
73use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
74use risingwave_pb::hummock::get_table_change_logs_request::PbTableFilter;
75use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
76use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
77use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
78use risingwave_pb::hummock::write_limits::WriteLimit;
79use risingwave_pb::hummock::*;
80use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Register as IcebergRegister;
81use risingwave_pb::iceberg_compaction::{
82    SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
83    subscribe_iceberg_compaction_event_request,
84};
85use risingwave_pb::id::{ActorId, FragmentId, HummockSstableId, SourceId};
86use risingwave_pb::meta::alter_connector_props_request::{
87    AlterConnectorPropsObject, AlterIcebergTableIds, ExtraOptions,
88};
89use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
90use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
91use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
92use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
93use risingwave_pb::meta::hosted_iceberg_catalog_service_client::HostedIcebergCatalogServiceClient;
94use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
95use risingwave_pb::meta::list_actor_states_response::ActorState;
96use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
97use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
98use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
99use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
100use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
101use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
102use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
103use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
104use risingwave_pb::meta::serving_service_client::ServingServiceClient;
105use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
106use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
107use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
108use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
109use risingwave_pb::meta::{FragmentDistribution, *};
110use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
111use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
112use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
113use risingwave_pb::secret::PbSecretRef;
114use risingwave_pb::stream_plan::StreamFragmentGraph;
115use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
116use risingwave_pb::user::update_user_request::UpdateField;
117use risingwave_pb::user::user_service_client::UserServiceClient;
118use risingwave_pb::user::*;
119use thiserror_ext::AsReport;
120use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
121use tokio::sync::oneshot::Sender;
122use tokio::sync::{RwLock, mpsc, oneshot};
123use tokio::task::JoinHandle;
124use tokio::time::{self};
125use tokio_retry::strategy::{ExponentialBackoff, jitter};
126use tokio_stream::wrappers::UnboundedReceiverStream;
127use tonic::transport::Endpoint;
128use tonic::{Code, Request, Streaming};
129
130use crate::channel::{Channel, WrappedChannelExt};
131use crate::error::{Result, RpcError};
132use crate::hummock_meta_client::{
133    CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
134    IcebergCompactionEventItem,
135};
136use crate::meta_rpc_client_method_impl;
137
138/// Client to meta server. Cloning the instance is lightweight.
139#[derive(Clone, Debug)]
140pub struct MetaClient {
141    worker_id: WorkerId,
142    worker_type: WorkerType,
143    host_addr: HostAddr,
144    inner: GrpcMetaClient,
145    meta_config: Arc<MetaConfig>,
146    cluster_id: String,
147    shutting_down: Arc<AtomicBool>,
148}
149
150impl MetaClient {
151    pub fn worker_id(&self) -> WorkerId {
152        self.worker_id
153    }
154
155    pub fn host_addr(&self) -> &HostAddr {
156        &self.host_addr
157    }
158
159    pub fn worker_type(&self) -> WorkerType {
160        self.worker_type
161    }
162
163    pub fn cluster_id(&self) -> &str {
164        &self.cluster_id
165    }
166
167    /// Subscribe to notification from meta.
168    pub async fn subscribe(
169        &self,
170        subscribe_type: SubscribeType,
171    ) -> Result<Streaming<SubscribeResponse>> {
172        let request = SubscribeRequest {
173            subscribe_type: subscribe_type as i32,
174            host: Some(self.host_addr.to_protobuf()),
175            worker_id: self.worker_id(),
176        };
177
178        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
179            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
180            true,
181        );
182
183        tokio_retry::Retry::spawn(retry_strategy, || async {
184            let request = request.clone();
185            self.inner.subscribe(request).await
186        })
187        .await
188    }
189
190    pub async fn create_connection(
191        &self,
192        connection_name: String,
193        database_id: DatabaseId,
194        schema_id: SchemaId,
195        owner_id: UserId,
196        req: create_connection_request::Payload,
197    ) -> Result<WaitVersion> {
198        let request = CreateConnectionRequest {
199            name: connection_name,
200            database_id,
201            schema_id,
202            owner_id,
203            payload: Some(req),
204        };
205        let resp = self.inner.create_connection(request).await?;
206        Ok(resp
207            .version
208            .ok_or_else(|| anyhow!("wait version not set"))?)
209    }
210
211    pub async fn create_secret(
212        &self,
213        secret_name: String,
214        database_id: DatabaseId,
215        schema_id: SchemaId,
216        owner_id: UserId,
217        value: Vec<u8>,
218    ) -> Result<WaitVersion> {
219        let request = CreateSecretRequest {
220            name: secret_name,
221            database_id,
222            schema_id,
223            owner_id,
224            value,
225        };
226        let resp = self.inner.create_secret(request).await?;
227        Ok(resp
228            .version
229            .ok_or_else(|| anyhow!("wait version not set"))?)
230    }
231
232    pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
233        let request = ListConnectionsRequest {};
234        let resp = self.inner.list_connections(request).await?;
235        Ok(resp.connections)
236    }
237
238    pub async fn drop_connection(
239        &self,
240        connection_id: ConnectionId,
241        cascade: bool,
242    ) -> Result<WaitVersion> {
243        let request = DropConnectionRequest {
244            connection_id,
245            cascade,
246        };
247        let resp = self.inner.drop_connection(request).await?;
248        Ok(resp
249            .version
250            .ok_or_else(|| anyhow!("wait version not set"))?)
251    }
252
253    pub async fn drop_secret(&self, secret_id: SecretId, cascade: bool) -> Result<WaitVersion> {
254        let request = DropSecretRequest { secret_id, cascade };
255        let resp = self.inner.drop_secret(request).await?;
256        Ok(resp
257            .version
258            .ok_or_else(|| anyhow!("wait version not set"))?)
259    }
260
261    /// Register the current node to the cluster and set the corresponding worker id.
262    ///
263    /// Retry if there's connection issue with the meta node. Exit the process if the registration fails.
264    pub async fn register_new(
265        addr_strategy: MetaAddressStrategy,
266        worker_type: WorkerType,
267        addr: &HostAddr,
268        property: Property,
269        meta_config: Arc<MetaConfig>,
270    ) -> (Self, SystemParamsReader) {
271        let ret =
272            Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
273
274        match ret {
275            Ok(ret) => ret,
276            Err(err) => {
277                tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
278                std::process::exit(1);
279            }
280        }
281    }
282
283    async fn register_new_inner(
284        addr_strategy: MetaAddressStrategy,
285        worker_type: WorkerType,
286        addr: &HostAddr,
287        property: Property,
288        meta_config: Arc<MetaConfig>,
289    ) -> Result<(Self, SystemParamsReader)> {
290        tracing::info!("register meta client using strategy: {}", addr_strategy);
291
292        // Retry until reaching `max_heartbeat_interval_secs`
293        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
294            Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
295            true,
296        );
297
298        let init_result: Result<_> = tokio_retry::RetryIf::spawn(
299            retry_strategy,
300            || async {
301                let grpc_meta_client =
302                    GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
303
304                let add_worker_resp = grpc_meta_client
305                    .add_worker_node(AddWorkerNodeRequest {
306                        worker_type: worker_type as i32,
307                        host: Some(addr.to_protobuf()),
308                        property: Some(property.clone()),
309                        resource: Some(risingwave_pb::common::worker_node::Resource {
310                            rw_version: RW_VERSION.to_owned(),
311                            total_memory_bytes: system_memory_available_bytes() as _,
312                            total_cpu_cores: total_cpu_available() as _,
313                            hostname: hostname(),
314                        }),
315                    })
316                    .await
317                    .context("failed to add worker node")?;
318
319                let system_params_resp = grpc_meta_client
320                    .get_system_params(GetSystemParamsRequest {})
321                    .await
322                    .context("failed to get initial system params")?;
323
324                Ok((add_worker_resp, system_params_resp, grpc_meta_client))
325            },
326            // Only retry if there's any transient connection issue.
327            // If the error is from our implementation or business, do not retry it.
328            |e: &RpcError| !e.is_from_tonic_server_impl(),
329        )
330        .await;
331
332        let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
333        let worker_id = add_worker_resp
334            .node_id
335            .expect("AddWorkerNodeResponse::node_id is empty");
336
337        let meta_client = Self {
338            worker_id,
339            worker_type,
340            host_addr: addr.clone(),
341            inner: grpc_meta_client,
342            meta_config: meta_config.clone(),
343            cluster_id: add_worker_resp.cluster_id,
344            shutting_down: Arc::new(false.into()),
345        };
346
347        static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
348        REPORT_PANIC.call_once(|| {
349            let meta_client_clone = meta_client.clone();
350            std::panic::update_hook(move |default_hook, info| {
351                // Try to report panic event to meta node.
352                meta_client_clone.try_add_panic_event_blocking(info, None);
353                default_hook(info);
354            });
355        });
356
357        Ok((meta_client, system_params_resp.params.unwrap().into()))
358    }
359
360    /// Activate the current node in cluster to confirm it's ready to serve.
361    pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
362        let request = ActivateWorkerNodeRequest {
363            host: Some(addr.to_protobuf()),
364            node_id: self.worker_id,
365        };
366        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
367            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
368            true,
369        );
370        tokio_retry::Retry::spawn(retry_strategy, || async {
371            let request = request.clone();
372            self.inner.activate_worker_node(request).await
373        })
374        .await?;
375
376        Ok(())
377    }
378
379    /// Send heartbeat signal to meta service.
380    pub async fn send_heartbeat(&self) -> Result<()> {
381        let request = HeartbeatRequest {
382            node_id: self.worker_id,
383            resource: Some(risingwave_pb::common::worker_node::Resource {
384                rw_version: RW_VERSION.to_owned(),
385                total_memory_bytes: system_memory_available_bytes() as _,
386                total_cpu_cores: total_cpu_available() as _,
387                hostname: hostname(),
388            }),
389        };
390        let resp = self.inner.heartbeat(request).await?;
391        if let Some(status) = resp.status
392            && status.code() == risingwave_pb::common::status::Code::UnknownWorker
393        {
394            // Ignore the error if we're already shutting down.
395            // Otherwise, exit the process.
396            if !self.shutting_down.load(Relaxed) {
397                tracing::error!(message = status.message, "worker expired");
398                std::process::exit(1);
399            }
400        }
401        Ok(())
402    }
403
404    pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
405        let request = CreateDatabaseRequest { db: Some(db) };
406        let resp = self.inner.create_database(request).await?;
407        // TODO: handle error in `resp.status` here
408        Ok(resp
409            .version
410            .ok_or_else(|| anyhow!("wait version not set"))?)
411    }
412
413    pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
414        let request = CreateSchemaRequest {
415            schema: Some(schema),
416        };
417        let resp = self.inner.create_schema(request).await?;
418        // TODO: handle error in `resp.status` here
419        Ok(resp
420            .version
421            .ok_or_else(|| anyhow!("wait version not set"))?)
422    }
423
424    pub async fn create_materialized_view(
425        &self,
426        table: PbTable,
427        graph: StreamFragmentGraph,
428        dependencies: HashSet<ObjectId>,
429        resource_type: streaming_job_resource_type::ResourceType,
430        if_not_exists: bool,
431        refresh_interval_sec: Option<u64>,
432    ) -> Result<WaitVersion> {
433        let request = CreateMaterializedViewRequest {
434            materialized_view: Some(table),
435            fragment_graph: Some(graph),
436            resource_type: Some(PbStreamingJobResourceType {
437                resource_type: Some(resource_type),
438            }),
439            dependencies: dependencies.into_iter().collect(),
440            if_not_exists,
441            refresh_interval_sec,
442        };
443        let resp = self.inner.create_materialized_view(request).await?;
444        // TODO: handle error in `resp.status` here
445        Ok(resp
446            .version
447            .ok_or_else(|| anyhow!("wait version not set"))?)
448    }
449
450    pub async fn drop_materialized_view(
451        &self,
452        table_id: TableId,
453        cascade: bool,
454    ) -> Result<WaitVersion> {
455        let request = DropMaterializedViewRequest { table_id, cascade };
456
457        let resp = self.inner.drop_materialized_view(request).await?;
458        Ok(resp
459            .version
460            .ok_or_else(|| anyhow!("wait version not set"))?)
461    }
462
463    pub async fn create_source(
464        &self,
465        source: PbSource,
466        graph: Option<StreamFragmentGraph>,
467        if_not_exists: bool,
468    ) -> Result<WaitVersion> {
469        let request = CreateSourceRequest {
470            source: Some(source),
471            fragment_graph: graph,
472            if_not_exists,
473        };
474
475        let resp = self.inner.create_source(request).await?;
476        Ok(resp
477            .version
478            .ok_or_else(|| anyhow!("wait version not set"))?)
479    }
480
481    pub async fn create_sink(
482        &self,
483        sink: PbSink,
484        graph: StreamFragmentGraph,
485        dependencies: HashSet<ObjectId>,
486        resource_type: streaming_job_resource_type::ResourceType,
487        if_not_exists: bool,
488    ) -> Result<WaitVersion> {
489        let request = CreateSinkRequest {
490            sink: Some(sink),
491            fragment_graph: Some(graph),
492            dependencies: dependencies.into_iter().collect(),
493            if_not_exists,
494            resource_type: Some(PbStreamingJobResourceType {
495                resource_type: Some(resource_type),
496            }),
497        };
498
499        let resp = self.inner.create_sink(request).await?;
500        Ok(resp
501            .version
502            .ok_or_else(|| anyhow!("wait version not set"))?)
503    }
504
505    pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
506        let request = CreateSubscriptionRequest {
507            subscription: Some(subscription),
508        };
509
510        let resp = self.inner.create_subscription(request).await?;
511        Ok(resp
512            .version
513            .ok_or_else(|| anyhow!("wait version not set"))?)
514    }
515
516    pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
517        let request = CreateFunctionRequest {
518            function: Some(function),
519        };
520        let resp = self.inner.create_function(request).await?;
521        Ok(resp
522            .version
523            .ok_or_else(|| anyhow!("wait version not set"))?)
524    }
525
526    pub async fn create_table(
527        &self,
528        source: Option<PbSource>,
529        table: PbTable,
530        graph: StreamFragmentGraph,
531        job_type: PbTableJobType,
532        if_not_exists: bool,
533        dependencies: HashSet<ObjectId>,
534    ) -> Result<WaitVersion> {
535        let request = CreateTableRequest {
536            materialized_view: Some(table),
537            fragment_graph: Some(graph),
538            source,
539            job_type: job_type as _,
540            if_not_exists,
541            dependencies: dependencies.into_iter().collect(),
542        };
543        let resp = self.inner.create_table(request).await?;
544        // TODO: handle error in `resp.status` here
545        Ok(resp
546            .version
547            .ok_or_else(|| anyhow!("wait version not set"))?)
548    }
549
550    pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
551        let request = CommentOnRequest {
552            comment: Some(comment),
553        };
554        let resp = self.inner.comment_on(request).await?;
555        Ok(resp
556            .version
557            .ok_or_else(|| anyhow!("wait version not set"))?)
558    }
559
560    pub async fn alter_name(
561        &self,
562        object: alter_name_request::Object,
563        name: &str,
564    ) -> Result<WaitVersion> {
565        let request = AlterNameRequest {
566            object: Some(object),
567            new_name: name.to_owned(),
568        };
569        let resp = self.inner.alter_name(request).await?;
570        Ok(resp
571            .version
572            .ok_or_else(|| anyhow!("wait version not set"))?)
573    }
574
575    pub async fn alter_owner(&self, object: Object, owner_id: UserId) -> Result<WaitVersion> {
576        let request = AlterOwnerRequest {
577            object: Some(object),
578            owner_id,
579        };
580        let resp = self.inner.alter_owner(request).await?;
581        Ok(resp
582            .version
583            .ok_or_else(|| anyhow!("wait version not set"))?)
584    }
585
586    pub async fn alter_subscription_retention(
587        &self,
588        subscription_id: SubscriptionId,
589        retention_seconds: u64,
590        definition: String,
591    ) -> Result<WaitVersion> {
592        let request = AlterSubscriptionRetentionRequest {
593            subscription_id,
594            retention_seconds,
595            definition,
596        };
597        let resp = self.inner.alter_subscription_retention(request).await?;
598        Ok(resp
599            .version
600            .ok_or_else(|| anyhow!("wait version not set"))?)
601    }
602
603    pub async fn alter_database_param(
604        &self,
605        database_id: DatabaseId,
606        param: AlterDatabaseParam,
607    ) -> Result<WaitVersion> {
608        let request = match param {
609            AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms) => {
610                let barrier_interval_ms = OptionalUint32 {
611                    value: barrier_interval_ms,
612                };
613                AlterDatabaseParamRequest {
614                    database_id,
615                    param: Some(alter_database_param_request::Param::BarrierIntervalMs(
616                        barrier_interval_ms,
617                    )),
618                }
619            }
620            AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency) => {
621                let checkpoint_frequency = OptionalUint64 {
622                    value: checkpoint_frequency,
623                };
624                AlterDatabaseParamRequest {
625                    database_id,
626                    param: Some(alter_database_param_request::Param::CheckpointFrequency(
627                        checkpoint_frequency,
628                    )),
629                }
630            }
631        };
632        let resp = self.inner.alter_database_param(request).await?;
633        Ok(resp
634            .version
635            .ok_or_else(|| anyhow!("wait version not set"))?)
636    }
637
638    pub async fn alter_set_schema(
639        &self,
640        object: alter_set_schema_request::Object,
641        new_schema_id: SchemaId,
642    ) -> Result<WaitVersion> {
643        let request = AlterSetSchemaRequest {
644            new_schema_id,
645            object: Some(object),
646        };
647        let resp = self.inner.alter_set_schema(request).await?;
648        Ok(resp
649            .version
650            .ok_or_else(|| anyhow!("wait version not set"))?)
651    }
652
653    pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
654        let request = AlterSourceRequest {
655            source: Some(source),
656        };
657        let resp = self.inner.alter_source(request).await?;
658        Ok(resp
659            .version
660            .ok_or_else(|| anyhow!("wait version not set"))?)
661    }
662
663    pub async fn alter_parallelism(
664        &self,
665        job_id: JobId,
666        parallelism: PbTableParallelism,
667        adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
668        deferred: bool,
669    ) -> Result<()> {
670        let request = AlterParallelismRequest {
671            table_id: job_id,
672            parallelism: Some(parallelism),
673            deferred,
674            adaptive_parallelism_strategy: adaptive_parallelism_strategy
675                .map(|strategy| strategy.to_string()),
676        };
677
678        self.inner.alter_parallelism(request).await?;
679        Ok(())
680    }
681
682    pub async fn alter_backfill_parallelism(
683        &self,
684        job_id: JobId,
685        parallelism: Option<PbTableParallelism>,
686        adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
687        deferred: bool,
688    ) -> Result<()> {
689        let request = AlterBackfillParallelismRequest {
690            table_id: job_id,
691            parallelism,
692            deferred,
693            adaptive_parallelism_strategy: adaptive_parallelism_strategy
694                .map(|strategy| strategy.to_string()),
695        };
696
697        self.inner.alter_backfill_parallelism(request).await?;
698        Ok(())
699    }
700
701    pub async fn alter_streaming_job_config(
702        &self,
703        job_id: JobId,
704        entries_to_add: HashMap<String, String>,
705        keys_to_remove: Vec<String>,
706    ) -> Result<()> {
707        let request = AlterStreamingJobConfigRequest {
708            job_id,
709            entries_to_add,
710            keys_to_remove,
711        };
712
713        self.inner.alter_streaming_job_config(request).await?;
714        Ok(())
715    }
716
717    pub async fn alter_fragment_parallelism(
718        &self,
719        fragment_ids: Vec<FragmentId>,
720        parallelism: Option<PbTableParallelism>,
721    ) -> Result<()> {
722        let request = AlterFragmentParallelismRequest {
723            fragment_ids,
724            parallelism,
725        };
726
727        self.inner.alter_fragment_parallelism(request).await?;
728        Ok(())
729    }
730
731    pub async fn alter_cdc_table_backfill_parallelism(
732        &self,
733        table_id: JobId,
734        parallelism: PbTableParallelism,
735    ) -> Result<()> {
736        let request = AlterCdcTableBackfillParallelismRequest {
737            table_id,
738            parallelism: Some(parallelism),
739        };
740        self.inner
741            .alter_cdc_table_backfill_parallelism(request)
742            .await?;
743        Ok(())
744    }
745
746    pub async fn alter_resource_group(
747        &self,
748        job_id: JobId,
749        resource_group: Option<String>,
750        deferred: bool,
751    ) -> Result<()> {
752        let request = AlterResourceGroupRequest {
753            job_id,
754            resource_group,
755            deferred,
756        };
757
758        self.inner.alter_resource_group(request).await?;
759        Ok(())
760    }
761
762    pub async fn alter_database_resource_group(
763        &self,
764        database_id: DatabaseId,
765        resource_group: Option<String>,
766        deferred: bool,
767    ) -> Result<WaitVersion> {
768        let request = AlterDatabaseResourceGroupRequest {
769            database_id,
770            resource_group,
771            deferred,
772        };
773
774        let resp = self.inner.alter_database_resource_group(request).await?;
775        Ok(resp
776            .version
777            .ok_or_else(|| anyhow!("wait version not set"))?)
778    }
779
780    pub async fn alter_swap_rename(
781        &self,
782        object: alter_swap_rename_request::Object,
783    ) -> Result<WaitVersion> {
784        let request = AlterSwapRenameRequest {
785            object: Some(object),
786        };
787        let resp = self.inner.alter_swap_rename(request).await?;
788        Ok(resp
789            .version
790            .ok_or_else(|| anyhow!("wait version not set"))?)
791    }
792
793    pub async fn alter_secret(
794        &self,
795        secret_id: SecretId,
796        secret_name: String,
797        database_id: DatabaseId,
798        schema_id: SchemaId,
799        owner_id: UserId,
800        value: Vec<u8>,
801    ) -> Result<WaitVersion> {
802        let request = AlterSecretRequest {
803            secret_id,
804            name: secret_name,
805            database_id,
806            schema_id,
807            owner_id,
808            value,
809        };
810        let resp = self.inner.alter_secret(request).await?;
811        Ok(resp
812            .version
813            .ok_or_else(|| anyhow!("wait version not set"))?)
814    }
815
816    pub async fn replace_job(
817        &self,
818        graph: StreamFragmentGraph,
819        replace_job: ReplaceJob,
820    ) -> Result<WaitVersion> {
821        let request = ReplaceJobPlanRequest {
822            plan: Some(ReplaceJobPlan {
823                fragment_graph: Some(graph),
824                replace_job: Some(replace_job),
825            }),
826        };
827        let resp = self.inner.replace_job_plan(request).await?;
828        // TODO: handle error in `resp.status` here
829        Ok(resp
830            .version
831            .ok_or_else(|| anyhow!("wait version not set"))?)
832    }
833
834    pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
835        let request = AutoSchemaChangeRequest {
836            schema_change: Some(schema_change),
837        };
838        let _ = self.inner.auto_schema_change(request).await?;
839        Ok(())
840    }
841
842    pub async fn create_view(
843        &self,
844        view: PbView,
845        dependencies: HashSet<ObjectId>,
846    ) -> Result<WaitVersion> {
847        let request = CreateViewRequest {
848            view: Some(view),
849            dependencies: dependencies.into_iter().collect(),
850        };
851        let resp = self.inner.create_view(request).await?;
852        // TODO: handle error in `resp.status` here
853        Ok(resp
854            .version
855            .ok_or_else(|| anyhow!("wait version not set"))?)
856    }
857
858    pub async fn create_index(
859        &self,
860        index: PbIndex,
861        table: PbTable,
862        graph: StreamFragmentGraph,
863        resource_type: streaming_job_resource_type::ResourceType,
864        if_not_exists: bool,
865    ) -> Result<WaitVersion> {
866        let request = CreateIndexRequest {
867            index: Some(index),
868            index_table: Some(table),
869            fragment_graph: Some(graph),
870            if_not_exists,
871            resource_type: Some(PbStreamingJobResourceType {
872                resource_type: Some(resource_type),
873            }),
874        };
875        let resp = self.inner.create_index(request).await?;
876        // TODO: handle error in `resp.status` here
877        Ok(resp
878            .version
879            .ok_or_else(|| anyhow!("wait version not set"))?)
880    }
881
882    pub async fn drop_table(
883        &self,
884        source_id: Option<SourceId>,
885        table_id: TableId,
886        cascade: bool,
887    ) -> Result<WaitVersion> {
888        let request = DropTableRequest {
889            source_id: source_id.map(risingwave_pb::ddl_service::drop_table_request::SourceId::Id),
890            table_id,
891            cascade,
892        };
893
894        let resp = self.inner.drop_table(request).await?;
895        Ok(resp
896            .version
897            .ok_or_else(|| anyhow!("wait version not set"))?)
898    }
899
900    pub async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
901        let request = CompactIcebergTableRequest { sink_id };
902        let resp = self.inner.compact_iceberg_table(request).await?;
903        Ok(resp.task_id)
904    }
905
906    pub async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
907        let request = ExpireIcebergTableSnapshotsRequest { sink_id };
908        let _resp = self.inner.expire_iceberg_table_snapshots(request).await?;
909        Ok(())
910    }
911
912    pub async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<WaitVersion> {
913        let request = DropViewRequest { view_id, cascade };
914        let resp = self.inner.drop_view(request).await?;
915        Ok(resp
916            .version
917            .ok_or_else(|| anyhow!("wait version not set"))?)
918    }
919
920    pub async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<WaitVersion> {
921        let request = DropSourceRequest { source_id, cascade };
922        let resp = self.inner.drop_source(request).await?;
923        Ok(resp
924            .version
925            .ok_or_else(|| anyhow!("wait version not set"))?)
926    }
927
928    pub async fn reset_source(&self, source_id: SourceId) -> Result<WaitVersion> {
929        let request = ResetSourceRequest { source_id };
930        let resp = self.inner.reset_source(request).await?;
931        Ok(resp
932            .version
933            .ok_or_else(|| anyhow!("wait version not set"))?)
934    }
935
936    pub async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<WaitVersion> {
937        let request = DropSinkRequest { sink_id, cascade };
938        let resp = self.inner.drop_sink(request).await?;
939        Ok(resp
940            .version
941            .ok_or_else(|| anyhow!("wait version not set"))?)
942    }
943
944    pub async fn drop_subscription(
945        &self,
946        subscription_id: SubscriptionId,
947        cascade: bool,
948    ) -> Result<WaitVersion> {
949        let request = DropSubscriptionRequest {
950            subscription_id,
951            cascade,
952        };
953        let resp = self.inner.drop_subscription(request).await?;
954        Ok(resp
955            .version
956            .ok_or_else(|| anyhow!("wait version not set"))?)
957    }
958
959    pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
960        let request = DropIndexRequest { index_id, cascade };
961        let resp = self.inner.drop_index(request).await?;
962        Ok(resp
963            .version
964            .ok_or_else(|| anyhow!("wait version not set"))?)
965    }
966
967    pub async fn drop_function(
968        &self,
969        function_id: FunctionId,
970        cascade: bool,
971    ) -> Result<WaitVersion> {
972        let request = DropFunctionRequest {
973            function_id,
974            cascade,
975        };
976        let resp = self.inner.drop_function(request).await?;
977        Ok(resp
978            .version
979            .ok_or_else(|| anyhow!("wait version not set"))?)
980    }
981
982    pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
983        let request = DropDatabaseRequest { database_id };
984        let resp = self.inner.drop_database(request).await?;
985        Ok(resp
986            .version
987            .ok_or_else(|| anyhow!("wait version not set"))?)
988    }
989
990    pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
991        let request = DropSchemaRequest { schema_id, cascade };
992        let resp = self.inner.drop_schema(request).await?;
993        Ok(resp
994            .version
995            .ok_or_else(|| anyhow!("wait version not set"))?)
996    }
997
998    // TODO: using UserInfoVersion instead as return type.
999    pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
1000        let request = CreateUserRequest { user: Some(user) };
1001        let resp = self.inner.create_user(request).await?;
1002        Ok(resp.version)
1003    }
1004
1005    pub async fn drop_user(&self, user_id: UserId) -> Result<u64> {
1006        let request = DropUserRequest { user_id };
1007        let resp = self.inner.drop_user(request).await?;
1008        Ok(resp.version)
1009    }
1010
1011    pub async fn update_user(
1012        &self,
1013        user: UserInfo,
1014        update_fields: Vec<UpdateField>,
1015    ) -> Result<u64> {
1016        let request = UpdateUserRequest {
1017            user: Some(user),
1018            update_fields: update_fields
1019                .into_iter()
1020                .map(|field| field as i32)
1021                .collect::<Vec<_>>(),
1022        };
1023        let resp = self.inner.update_user(request).await?;
1024        Ok(resp.version)
1025    }
1026
1027    pub async fn grant_privilege(
1028        &self,
1029        user_ids: Vec<UserId>,
1030        privileges: Vec<GrantPrivilege>,
1031        with_grant_option: bool,
1032        granted_by: UserId,
1033    ) -> Result<u64> {
1034        let request = GrantPrivilegeRequest {
1035            user_ids,
1036            privileges,
1037            with_grant_option,
1038            granted_by,
1039        };
1040        let resp = self.inner.grant_privilege(request).await?;
1041        Ok(resp.version)
1042    }
1043
1044    pub async fn revoke_privilege(
1045        &self,
1046        user_ids: Vec<UserId>,
1047        privileges: Vec<GrantPrivilege>,
1048        granted_by: UserId,
1049        revoke_by: UserId,
1050        revoke_grant_option: bool,
1051        cascade: bool,
1052    ) -> Result<u64> {
1053        let request = RevokePrivilegeRequest {
1054            user_ids,
1055            privileges,
1056            granted_by,
1057            revoke_by,
1058            revoke_grant_option,
1059            cascade,
1060        };
1061        let resp = self.inner.revoke_privilege(request).await?;
1062        Ok(resp.version)
1063    }
1064
1065    pub async fn alter_default_privilege(
1066        &self,
1067        user_ids: Vec<UserId>,
1068        database_id: DatabaseId,
1069        schema_ids: Vec<SchemaId>,
1070        operation: AlterDefaultPrivilegeOperation,
1071        granted_by: UserId,
1072    ) -> Result<()> {
1073        let request = AlterDefaultPrivilegeRequest {
1074            user_ids,
1075            database_id,
1076            schema_ids,
1077            operation: Some(operation),
1078            granted_by,
1079        };
1080        self.inner.alter_default_privilege(request).await?;
1081        Ok(())
1082    }
1083
1084    /// Unregister the current node from the cluster.
1085    pub async fn unregister(&self) -> Result<()> {
1086        let request = DeleteWorkerNodeRequest {
1087            host: Some(self.host_addr.to_protobuf()),
1088        };
1089        self.inner.delete_worker_node(request).await?;
1090        self.shutting_down.store(true, Relaxed);
1091        Ok(())
1092    }
1093
1094    /// Try to unregister the current worker from the cluster with best effort. Log the result.
1095    pub async fn try_unregister(&self) {
1096        match self.unregister().await {
1097            Ok(_) => {
1098                tracing::info!(
1099                    worker_id = %self.worker_id(),
1100                    "successfully unregistered from meta service",
1101                )
1102            }
1103            Err(e) => {
1104                tracing::warn!(
1105                    error = %e.as_report(),
1106                    worker_id = %self.worker_id(),
1107                    "failed to unregister from meta service",
1108                );
1109            }
1110        }
1111    }
1112
1113    pub async fn list_worker_nodes(
1114        &self,
1115        worker_type: Option<WorkerType>,
1116    ) -> Result<Vec<WorkerNode>> {
1117        let request = ListAllNodesRequest {
1118            worker_type: worker_type.map(Into::into),
1119            include_starting_nodes: true,
1120        };
1121        let resp = self.inner.list_all_nodes(request).await?;
1122        Ok(resp.nodes)
1123    }
1124
1125    /// Starts a heartbeat worker.
1126    pub fn start_heartbeat_loop(
1127        meta_client: MetaClient,
1128        min_interval: Duration,
1129    ) -> (JoinHandle<()>, Sender<()>) {
1130        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1131        let join_handle = tokio::spawn(async move {
1132            let mut min_interval_ticker = tokio::time::interval(min_interval);
1133            loop {
1134                tokio::select! {
1135                    biased;
1136                    // Shutdown
1137                    _ = &mut shutdown_rx => {
1138                        tracing::info!("Heartbeat loop is stopped");
1139                        return;
1140                    }
1141                    // Wait for interval
1142                    _ = min_interval_ticker.tick() => {},
1143                }
1144                tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
1145                match tokio::time::timeout(
1146                    // TODO: decide better min_interval for timeout
1147                    min_interval * 3,
1148                    meta_client.send_heartbeat(),
1149                )
1150                .await
1151                {
1152                    Ok(Ok(_)) => {}
1153                    Ok(Err(err)) => {
1154                        tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
1155                    }
1156                    Err(_) => {
1157                        tracing::warn!("Failed to send_heartbeat: timeout");
1158                    }
1159                }
1160            }
1161        });
1162        (join_handle, shutdown_tx)
1163    }
1164
1165    pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
1166        let request = RisectlListStateTablesRequest {};
1167        let resp = self.inner.risectl_list_state_tables(request).await?;
1168        Ok(resp.tables)
1169    }
1170
1171    pub async fn risectl_resume_backfill(
1172        &self,
1173        request: RisectlResumeBackfillRequest,
1174    ) -> Result<()> {
1175        self.inner.risectl_resume_backfill(request).await?;
1176        Ok(())
1177    }
1178
1179    pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
1180        let request = FlushRequest { database_id };
1181        let resp = self.inner.flush(request).await?;
1182        Ok(resp.hummock_version_id)
1183    }
1184
1185    pub async fn wait(&self, job_id: Option<JobId>) -> Result<WaitVersion> {
1186        let request = WaitRequest { job_id };
1187        let resp = self.inner.wait(request).await?;
1188        Ok(resp
1189            .version
1190            .ok_or_else(|| anyhow!("wait version not set"))?)
1191    }
1192
1193    pub async fn recover(&self) -> Result<()> {
1194        let request = RecoverRequest {};
1195        self.inner.recover(request).await?;
1196        Ok(())
1197    }
1198
1199    pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
1200        let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
1201        let resp = self.inner.cancel_creating_jobs(request).await?;
1202        Ok(resp.canceled_jobs)
1203    }
1204
1205    pub async fn list_table_fragments(
1206        &self,
1207        job_ids: &[JobId],
1208    ) -> Result<HashMap<JobId, TableFragmentInfo>> {
1209        let request = ListTableFragmentsRequest {
1210            table_ids: job_ids.to_vec(),
1211        };
1212        let resp = self.inner.list_table_fragments(request).await?;
1213        Ok(resp.table_fragments)
1214    }
1215
1216    pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1217        let resp = self
1218            .inner
1219            .list_streaming_job_states(ListStreamingJobStatesRequest {})
1220            .await?;
1221        Ok(resp.states)
1222    }
1223
1224    pub async fn list_fragment_distributions(
1225        &self,
1226        include_node: bool,
1227    ) -> Result<Vec<FragmentDistribution>> {
1228        let resp = self
1229            .inner
1230            .list_fragment_distribution(ListFragmentDistributionRequest {
1231                include_node: Some(include_node),
1232            })
1233            .await?;
1234        Ok(resp.distributions)
1235    }
1236
1237    pub async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
1238        let resp = self
1239            .inner
1240            .list_creating_fragment_distribution(ListCreatingFragmentDistributionRequest {
1241                include_node: Some(true),
1242            })
1243            .await?;
1244        Ok(resp.distributions)
1245    }
1246
1247    pub async fn get_fragment_by_id(
1248        &self,
1249        fragment_id: FragmentId,
1250    ) -> Result<Option<FragmentDistribution>> {
1251        let resp = self
1252            .inner
1253            .get_fragment_by_id(GetFragmentByIdRequest { fragment_id })
1254            .await?;
1255        Ok(resp.distribution)
1256    }
1257
1258    pub async fn get_fragment_vnodes(
1259        &self,
1260        fragment_id: FragmentId,
1261    ) -> Result<Vec<(ActorId, Vec<u32>)>> {
1262        let resp = self
1263            .inner
1264            .get_fragment_vnodes(GetFragmentVnodesRequest { fragment_id })
1265            .await?;
1266        Ok(resp
1267            .actor_vnodes
1268            .into_iter()
1269            .map(|actor| (actor.actor_id, actor.vnode_indices))
1270            .collect())
1271    }
1272
1273    pub async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>> {
1274        let resp = self
1275            .inner
1276            .get_actor_vnodes(GetActorVnodesRequest { actor_id })
1277            .await?;
1278        Ok(resp.vnode_indices)
1279    }
1280
1281    pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1282        let resp = self
1283            .inner
1284            .list_actor_states(ListActorStatesRequest {})
1285            .await?;
1286        Ok(resp.states)
1287    }
1288
1289    pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1290        let resp = self
1291            .inner
1292            .list_actor_splits(ListActorSplitsRequest {})
1293            .await?;
1294
1295        Ok(resp.actor_splits)
1296    }
1297
1298    pub async fn pause(&self) -> Result<PauseResponse> {
1299        let request = PauseRequest {};
1300        let resp = self.inner.pause(request).await?;
1301        Ok(resp)
1302    }
1303
1304    pub async fn resume(&self) -> Result<ResumeResponse> {
1305        let request = ResumeRequest {};
1306        let resp = self.inner.resume(request).await?;
1307        Ok(resp)
1308    }
1309
1310    pub async fn apply_throttle(
1311        &self,
1312        throttle_target: PbThrottleTarget,
1313        throttle_type: risingwave_pb::common::PbThrottleType,
1314        id: u32,
1315        rate: Option<u32>,
1316    ) -> Result<ApplyThrottleResponse> {
1317        let request = ApplyThrottleRequest {
1318            throttle_target: throttle_target as i32,
1319            throttle_type: throttle_type as i32,
1320            id,
1321            rate,
1322        };
1323        let resp = self.inner.apply_throttle(request).await?;
1324        Ok(resp)
1325    }
1326
1327    pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1328        let resp = self
1329            .inner
1330            .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1331            .await?;
1332        Ok(resp.get_status().unwrap())
1333    }
1334
1335    pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1336        let request = GetClusterInfoRequest {};
1337        let resp = self.inner.get_cluster_info(request).await?;
1338        Ok(resp)
1339    }
1340
1341    pub async fn reschedule(
1342        &self,
1343        worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1344        revision: u64,
1345        resolve_no_shuffle_upstream: bool,
1346    ) -> Result<(bool, u64)> {
1347        let request = RescheduleRequest {
1348            revision,
1349            resolve_no_shuffle_upstream,
1350            worker_reschedules,
1351        };
1352        let resp = self.inner.reschedule(request).await?;
1353        Ok((resp.success, resp.revision))
1354    }
1355
1356    pub async fn risectl_get_pinned_versions_summary(
1357        &self,
1358    ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1359        let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1360        self.inner
1361            .rise_ctl_get_pinned_versions_summary(request)
1362            .await
1363    }
1364
1365    pub async fn risectl_get_checkpoint_hummock_version(
1366        &self,
1367    ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1368        let request = RiseCtlGetCheckpointVersionRequest {};
1369        self.inner.rise_ctl_get_checkpoint_version(request).await
1370    }
1371
1372    pub async fn risectl_pause_hummock_version_checkpoint(
1373        &self,
1374    ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1375        let request = RiseCtlPauseVersionCheckpointRequest {};
1376        self.inner.rise_ctl_pause_version_checkpoint(request).await
1377    }
1378
1379    pub async fn risectl_resume_hummock_version_checkpoint(
1380        &self,
1381    ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1382        let request = RiseCtlResumeVersionCheckpointRequest {};
1383        self.inner.rise_ctl_resume_version_checkpoint(request).await
1384    }
1385
1386    pub async fn init_metadata_for_replay(
1387        &self,
1388        tables: Vec<PbTable>,
1389        compaction_groups: Vec<CompactionGroupInfo>,
1390    ) -> Result<()> {
1391        let req = InitMetadataForReplayRequest {
1392            tables,
1393            compaction_groups,
1394        };
1395        let _resp = self.inner.init_metadata_for_replay(req).await?;
1396        Ok(())
1397    }
1398
1399    pub async fn replay_version_delta(
1400        &self,
1401        version_delta: HummockVersionDelta,
1402    ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1403        let req = ReplayVersionDeltaRequest {
1404            version_delta: Some(version_delta.into()),
1405        };
1406        let resp = self.inner.replay_version_delta(req).await?;
1407        Ok((
1408            HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1409            resp.modified_compaction_groups,
1410        ))
1411    }
1412
1413    pub async fn list_version_deltas(
1414        &self,
1415        start_id: HummockVersionId,
1416        num_limit: u32,
1417        committed_epoch_limit: HummockEpoch,
1418    ) -> Result<Vec<HummockVersionDelta>> {
1419        let req = ListVersionDeltasRequest {
1420            start_id,
1421            num_limit,
1422            committed_epoch_limit,
1423        };
1424        Ok(self
1425            .inner
1426            .list_version_deltas(req)
1427            .await?
1428            .version_deltas
1429            .unwrap()
1430            .version_deltas
1431            .iter()
1432            .map(HummockVersionDelta::from_rpc_protobuf)
1433            .collect())
1434    }
1435
1436    pub async fn trigger_compaction_deterministic(
1437        &self,
1438        version_id: HummockVersionId,
1439        compaction_groups: Vec<CompactionGroupId>,
1440    ) -> Result<()> {
1441        let req = TriggerCompactionDeterministicRequest {
1442            version_id,
1443            compaction_groups,
1444        };
1445        self.inner.trigger_compaction_deterministic(req).await?;
1446        Ok(())
1447    }
1448
1449    pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1450        let req = DisableCommitEpochRequest {};
1451        Ok(HummockVersion::from_rpc_protobuf(
1452            &self
1453                .inner
1454                .disable_commit_epoch(req)
1455                .await?
1456                .current_version
1457                .unwrap(),
1458        ))
1459    }
1460
1461    pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1462        let req = GetAssignedCompactTaskNumRequest {};
1463        let resp = self.inner.get_assigned_compact_task_num(req).await?;
1464        Ok(resp.num_tasks as usize)
1465    }
1466
1467    pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1468        let req = RiseCtlListCompactionGroupRequest {};
1469        let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1470        Ok(resp.compaction_groups)
1471    }
1472
1473    pub async fn risectl_update_compaction_config(
1474        &self,
1475        compaction_groups: &[CompactionGroupId],
1476        configs: &[MutableConfig],
1477    ) -> Result<()> {
1478        let req = RiseCtlUpdateCompactionConfigRequest {
1479            compaction_group_ids: compaction_groups.to_vec(),
1480            configs: configs
1481                .iter()
1482                .map(
1483                    |c| rise_ctl_update_compaction_config_request::MutableConfig {
1484                        mutable_config: Some(c.clone()),
1485                    },
1486                )
1487                .collect(),
1488        };
1489        let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1490        Ok(())
1491    }
1492
1493    pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1494        let req = BackupMetaRequest { remarks };
1495        let resp = self.inner.backup_meta(req).await?;
1496        Ok(resp.job_id)
1497    }
1498
1499    pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1500        let req = GetBackupJobStatusRequest { job_id };
1501        let resp = self.inner.get_backup_job_status(req).await?;
1502        Ok((resp.job_status(), resp.message))
1503    }
1504
1505    pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1506        let req = DeleteMetaSnapshotRequest {
1507            snapshot_ids: snapshot_ids.to_vec(),
1508        };
1509        let _resp = self.inner.delete_meta_snapshot(req).await?;
1510        Ok(())
1511    }
1512
1513    pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1514        let req = GetMetaSnapshotManifestRequest {};
1515        let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1516        Ok(resp.manifest.expect("should exist"))
1517    }
1518
1519    pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1520        let req = GetTelemetryInfoRequest {};
1521        let resp = self.inner.get_telemetry_info(req).await?;
1522        Ok(resp)
1523    }
1524
1525    pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1526        let req = GetMetaStoreInfoRequest {};
1527        let resp = self.inner.get_meta_store_info(req).await?;
1528        Ok(resp.meta_store_endpoint)
1529    }
1530
1531    pub async fn alter_sink_props(
1532        &self,
1533        sink_id: SinkId,
1534        changed_props: BTreeMap<String, String>,
1535        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1536        connector_conn_ref: Option<ConnectionId>,
1537    ) -> Result<()> {
1538        let req = AlterConnectorPropsRequest {
1539            object_id: sink_id.as_raw_id(),
1540            changed_props: changed_props.into_iter().collect(),
1541            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1542            connector_conn_ref,
1543            object_type: AlterConnectorPropsObject::Sink as i32,
1544            extra_options: None,
1545        };
1546        let _resp = self.inner.alter_connector_props(req).await?;
1547        Ok(())
1548    }
1549
1550    pub async fn alter_iceberg_table_props(
1551        &self,
1552        table_id: TableId,
1553        sink_id: SinkId,
1554        source_id: SourceId,
1555        changed_props: BTreeMap<String, String>,
1556        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1557        connector_conn_ref: Option<ConnectionId>,
1558    ) -> Result<()> {
1559        let req = AlterConnectorPropsRequest {
1560            object_id: table_id.as_raw_id(),
1561            changed_props: changed_props.into_iter().collect(),
1562            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1563            connector_conn_ref,
1564            object_type: AlterConnectorPropsObject::IcebergTable as i32,
1565            extra_options: Some(ExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds {
1566                sink_id,
1567                source_id,
1568            })),
1569        };
1570        let _resp = self.inner.alter_connector_props(req).await?;
1571        Ok(())
1572    }
1573
1574    pub async fn alter_source_connector_props(
1575        &self,
1576        source_id: SourceId,
1577        changed_props: BTreeMap<String, String>,
1578        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1579        connector_conn_ref: Option<ConnectionId>,
1580    ) -> Result<()> {
1581        let req = AlterConnectorPropsRequest {
1582            object_id: source_id.as_raw_id(),
1583            changed_props: changed_props.into_iter().collect(),
1584            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1585            connector_conn_ref,
1586            object_type: AlterConnectorPropsObject::Source as i32,
1587            extra_options: None,
1588        };
1589        let _resp = self.inner.alter_connector_props(req).await?;
1590        Ok(())
1591    }
1592
1593    pub async fn alter_connection_connector_props(
1594        &self,
1595        connection_id: u32,
1596        changed_props: BTreeMap<String, String>,
1597        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1598    ) -> Result<()> {
1599        let req = AlterConnectorPropsRequest {
1600            object_id: connection_id,
1601            changed_props: changed_props.into_iter().collect(),
1602            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1603            connector_conn_ref: None, // Connections don't reference other connections
1604            object_type: AlterConnectorPropsObject::Connection as i32,
1605            extra_options: None,
1606        };
1607        let _resp = self.inner.alter_connector_props(req).await?;
1608        Ok(())
1609    }
1610
1611    /// Orchestrated source property update with pause/update/resume workflow.
1612    /// This is the "safe" version that pauses sources before updating and resumes after.
1613    pub async fn alter_source_properties_safe(
1614        &self,
1615        source_id: SourceId,
1616        changed_props: BTreeMap<String, String>,
1617        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1618        reset_splits: bool,
1619    ) -> Result<()> {
1620        let req = AlterSourcePropertiesSafeRequest {
1621            source_id: source_id.as_raw_id(),
1622            changed_props: changed_props.into_iter().collect(),
1623            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1624            options: Some(PropertyUpdateOptions { reset_splits }),
1625        };
1626        let _resp = self.inner.alter_source_properties_safe(req).await?;
1627        Ok(())
1628    }
1629
1630    /// Reset source split assignments (UNSAFE - admin only).
1631    /// This clears persisted split metadata and triggers re-discovery.
1632    pub async fn reset_source_splits(&self, source_id: SourceId) -> Result<()> {
1633        let req = ResetSourceSplitsRequest {
1634            source_id: source_id.as_raw_id(),
1635        };
1636        let _resp = self.inner.reset_source_splits(req).await?;
1637        Ok(())
1638    }
1639
1640    /// Inject specific offsets into source splits (UNSAFE - admin only).
1641    /// This can cause data duplication or loss depending on the correctness of the provided offsets.
1642    pub async fn inject_source_offsets(
1643        &self,
1644        source_id: SourceId,
1645        split_offsets: HashMap<String, String>,
1646    ) -> Result<Vec<String>> {
1647        let req = InjectSourceOffsetsRequest {
1648            source_id: source_id.as_raw_id(),
1649            split_offsets,
1650        };
1651        let resp = self.inner.inject_source_offsets(req).await?;
1652        Ok(resp.applied_split_ids)
1653    }
1654
1655    pub async fn set_system_param(
1656        &self,
1657        param: String,
1658        value: Option<String>,
1659    ) -> Result<Option<SystemParamsReader>> {
1660        let req = SetSystemParamRequest { param, value };
1661        let resp = self.inner.set_system_param(req).await?;
1662        Ok(resp.params.map(SystemParamsReader::from))
1663    }
1664
1665    pub async fn get_session_params(&self) -> Result<String> {
1666        let req = GetSessionParamsRequest {};
1667        let resp = self.inner.get_session_params(req).await?;
1668        Ok(resp.params)
1669    }
1670
1671    pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1672        let req = SetSessionParamRequest { param, value };
1673        let resp = self.inner.set_session_param(req).await?;
1674        Ok(resp.param)
1675    }
1676
1677    pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1678        let req = GetDdlProgressRequest {};
1679        let resp = self.inner.get_ddl_progress(req).await?;
1680        Ok(resp.ddl_progress)
1681    }
1682
1683    pub async fn split_compaction_group(
1684        &self,
1685        group_id: CompactionGroupId,
1686        table_ids_to_new_group: &[TableId],
1687        partition_vnode_count: u32,
1688    ) -> Result<CompactionGroupId> {
1689        let req = SplitCompactionGroupRequest {
1690            group_id,
1691            table_ids: table_ids_to_new_group.to_vec(),
1692            partition_vnode_count,
1693        };
1694        let resp = self.inner.split_compaction_group(req).await?;
1695        Ok(resp.new_group_id)
1696    }
1697
1698    pub async fn get_tables(
1699        &self,
1700        table_ids: Vec<TableId>,
1701        include_dropped_tables: bool,
1702    ) -> Result<HashMap<TableId, Table>> {
1703        let req = GetTablesRequest {
1704            table_ids,
1705            include_dropped_tables,
1706        };
1707        let resp = self.inner.get_tables(req).await?;
1708        Ok(resp.tables)
1709    }
1710
1711    pub async fn list_serving_vnode_mappings(
1712        &self,
1713    ) -> Result<HashMap<FragmentId, (JobId, WorkerSlotMapping)>> {
1714        let req = GetServingVnodeMappingsRequest {};
1715        let resp = self.inner.get_serving_vnode_mappings(req).await?;
1716        let mappings = resp
1717            .worker_slot_mappings
1718            .into_iter()
1719            .map(|p| {
1720                (
1721                    p.fragment_id,
1722                    (
1723                        resp.fragment_to_table
1724                            .get(&p.fragment_id)
1725                            .cloned()
1726                            .unwrap_or_default(),
1727                        WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1728                    ),
1729                )
1730            })
1731            .collect();
1732        Ok(mappings)
1733    }
1734
1735    pub async fn risectl_list_compaction_status(
1736        &self,
1737    ) -> Result<(
1738        Vec<CompactStatus>,
1739        Vec<CompactTaskAssignment>,
1740        Vec<CompactTaskProgress>,
1741    )> {
1742        let req = RiseCtlListCompactionStatusRequest {};
1743        let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1744        Ok((
1745            resp.compaction_statuses,
1746            resp.task_assignment,
1747            resp.task_progress,
1748        ))
1749    }
1750
1751    pub async fn get_compaction_score(
1752        &self,
1753        compaction_group_id: CompactionGroupId,
1754    ) -> Result<Vec<PickerInfo>> {
1755        let req = GetCompactionScoreRequest {
1756            compaction_group_id,
1757        };
1758        let resp = self.inner.get_compaction_score(req).await?;
1759        Ok(resp.scores)
1760    }
1761
1762    pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1763        let req = RiseCtlRebuildTableStatsRequest {};
1764        let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1765        Ok(())
1766    }
1767
1768    pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1769        let req = ListBranchedObjectRequest {};
1770        let resp = self.inner.list_branched_object(req).await?;
1771        Ok(resp.branched_objects)
1772    }
1773
1774    pub async fn list_active_write_limit(&self) -> Result<HashMap<CompactionGroupId, WriteLimit>> {
1775        let req = ListActiveWriteLimitRequest {};
1776        let resp = self.inner.list_active_write_limit(req).await?;
1777        Ok(resp.write_limits)
1778    }
1779
1780    pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1781        let req = ListHummockMetaConfigRequest {};
1782        let resp = self.inner.list_hummock_meta_config(req).await?;
1783        Ok(resp.configs)
1784    }
1785
1786    pub async fn get_table_change_logs(
1787        &self,
1788        epoch_only: bool,
1789        start_epoch_inclusive: Option<u64>,
1790        end_epoch_inclusive: Option<u64>,
1791        table_ids: Option<HashSet<TableId>>,
1792        exclude_empty: bool,
1793        limit: Option<u32>,
1794    ) -> Result<TableChangeLogs> {
1795        let req = GetTableChangeLogsRequest {
1796            epoch_only,
1797            start_epoch_inclusive,
1798            end_epoch_inclusive,
1799            table_ids: table_ids.map(|iter| PbTableFilter {
1800                table_ids: iter.into_iter().collect::<Vec<_>>(),
1801            }),
1802            exclude_empty,
1803            limit,
1804        };
1805        let resp = self.inner.get_table_change_logs(req).await?;
1806        Ok(resp
1807            .table_change_logs
1808            .into_iter()
1809            .map(|(id, change_log)| (TableId::new(id), TableChangeLog::from_protobuf(&change_log)))
1810            .collect())
1811    }
1812
1813    pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1814        let _resp = self
1815            .inner
1816            .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1817            .await?;
1818
1819        Ok(())
1820    }
1821
1822    pub async fn rw_cloud_validate_source(
1823        &self,
1824        source_type: SourceType,
1825        source_config: HashMap<String, String>,
1826    ) -> Result<RwCloudValidateSourceResponse> {
1827        let req = RwCloudValidateSourceRequest {
1828            source_type: source_type.into(),
1829            source_config,
1830        };
1831        let resp = self.inner.rw_cloud_validate_source(req).await?;
1832        Ok(resp)
1833    }
1834
1835    pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1836        self.inner.core.read().await.sink_coordinate_client.clone()
1837    }
1838
1839    pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1840        let req = ListCompactTaskAssignmentRequest {};
1841        let resp = self.inner.list_compact_task_assignment(req).await?;
1842        Ok(resp.task_assignment)
1843    }
1844
1845    pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1846        let req = ListEventLogRequest::default();
1847        let resp = self.inner.list_event_log(req).await?;
1848        Ok(resp.event_logs)
1849    }
1850
1851    pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1852        let req = ListCompactTaskProgressRequest {};
1853        let resp = self.inner.list_compact_task_progress(req).await?;
1854        Ok(resp.task_progress)
1855    }
1856
1857    #[cfg(madsim)]
1858    pub fn try_add_panic_event_blocking(
1859        &self,
1860        panic_info: impl Display,
1861        timeout_millis: Option<u64>,
1862    ) {
1863    }
1864
1865    /// If `timeout_millis` is None, default is used.
1866    #[cfg(not(madsim))]
1867    pub fn try_add_panic_event_blocking(
1868        &self,
1869        panic_info: impl Display,
1870        timeout_millis: Option<u64>,
1871    ) {
1872        let event = event_log::EventWorkerNodePanic {
1873            worker_id: self.worker_id,
1874            worker_type: self.worker_type.into(),
1875            host_addr: Some(self.host_addr.to_protobuf()),
1876            panic_info: format!("{panic_info}"),
1877        };
1878        let grpc_meta_client = self.inner.clone();
1879        let _ = thread::spawn(move || {
1880            let rt = tokio::runtime::Builder::new_current_thread()
1881                .enable_all()
1882                .build()
1883                .unwrap();
1884            let req = AddEventLogRequest {
1885                event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1886            };
1887            rt.block_on(async {
1888                let _ = tokio::time::timeout(
1889                    Duration::from_millis(timeout_millis.unwrap_or(1000)),
1890                    grpc_meta_client.add_event_log(req),
1891                )
1892                .await;
1893            });
1894        })
1895        .join();
1896    }
1897
1898    pub async fn add_sink_fail_evet(
1899        &self,
1900        sink_id: SinkId,
1901        sink_name: String,
1902        connector: String,
1903        error: String,
1904    ) -> Result<()> {
1905        let event = event_log::EventSinkFail {
1906            sink_id,
1907            sink_name,
1908            connector,
1909            error,
1910        };
1911        let req = AddEventLogRequest {
1912            event: Some(add_event_log_request::Event::SinkFail(event)),
1913        };
1914        self.inner.add_event_log(req).await?;
1915        Ok(())
1916    }
1917
1918    pub async fn add_cdc_auto_schema_change_fail_event(
1919        &self,
1920        source_id: SourceId,
1921        table_name: String,
1922        cdc_table_id: String,
1923        upstream_ddl: String,
1924        fail_info: String,
1925    ) -> Result<()> {
1926        let event = event_log::EventAutoSchemaChangeFail {
1927            table_id: source_id.as_cdc_table_id(),
1928            table_name,
1929            cdc_table_id,
1930            upstream_ddl,
1931            fail_info,
1932        };
1933        let req = AddEventLogRequest {
1934            event: Some(add_event_log_request::Event::AutoSchemaChangeFail(event)),
1935        };
1936        self.inner.add_event_log(req).await?;
1937        Ok(())
1938    }
1939
1940    pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1941        let req = CancelCompactTaskRequest {
1942            task_id,
1943            task_status: task_status as _,
1944        };
1945        let resp = self.inner.cancel_compact_task(req).await?;
1946        Ok(resp.ret)
1947    }
1948
1949    pub async fn get_version_by_epoch(
1950        &self,
1951        epoch: HummockEpoch,
1952        table_id: TableId,
1953    ) -> Result<PbHummockVersion> {
1954        let req = GetVersionByEpochRequest { epoch, table_id };
1955        let resp = self.inner.get_version_by_epoch(req).await?;
1956        Ok(resp.version.unwrap())
1957    }
1958
1959    pub async fn get_cluster_limits(
1960        &self,
1961    ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1962        let req = GetClusterLimitsRequest {};
1963        let resp = self.inner.get_cluster_limits(req).await?;
1964        Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1965    }
1966
1967    pub async fn merge_compaction_group(
1968        &self,
1969        left_group_id: CompactionGroupId,
1970        right_group_id: CompactionGroupId,
1971    ) -> Result<()> {
1972        let req = MergeCompactionGroupRequest {
1973            left_group_id,
1974            right_group_id,
1975        };
1976        self.inner.merge_compaction_group(req).await?;
1977        Ok(())
1978    }
1979
1980    /// List all rate limits for sources and backfills
1981    pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1982        let request = ListRateLimitsRequest {};
1983        let resp = self.inner.list_rate_limits(request).await?;
1984        Ok(resp.rate_limits)
1985    }
1986
1987    pub async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
1988        let request = ListCdcProgressRequest {};
1989        let resp = self.inner.list_cdc_progress(request).await?;
1990        Ok(resp.cdc_progress)
1991    }
1992
1993    pub async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>> {
1994        let request = ListRefreshTableStatesRequest {};
1995        let resp = self.inner.list_refresh_table_states(request).await?;
1996        Ok(resp.states)
1997    }
1998
1999    pub async fn list_iceberg_compaction_status(
2000        &self,
2001    ) -> Result<Vec<list_iceberg_compaction_status_response::IcebergCompactionStatus>> {
2002        let request = ListIcebergCompactionStatusRequest {};
2003        let resp = self.inner.list_iceberg_compaction_status(request).await?;
2004        Ok(resp.statuses)
2005    }
2006
2007    pub async fn list_sink_log_store_tables(
2008        &self,
2009    ) -> Result<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
2010        let request = ListSinkLogStoreTablesRequest {};
2011        let resp = self.inner.list_sink_log_store_tables(request).await?;
2012        Ok(resp.tables)
2013    }
2014
2015    pub async fn create_iceberg_table(
2016        &self,
2017        table_job_info: PbTableJobInfo,
2018        sink_job_info: PbSinkJobInfo,
2019        iceberg_source: PbSource,
2020        if_not_exists: bool,
2021    ) -> Result<WaitVersion> {
2022        let request = CreateIcebergTableRequest {
2023            table_info: Some(table_job_info),
2024            sink_info: Some(sink_job_info),
2025            iceberg_source: Some(iceberg_source),
2026            if_not_exists,
2027        };
2028
2029        let resp = Box::pin(self.inner.create_iceberg_table(request)).await?;
2030        Ok(resp
2031            .version
2032            .ok_or_else(|| anyhow!("wait version not set"))?)
2033    }
2034
2035    pub async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
2036        let request = ListIcebergTablesRequest {};
2037        let resp = self.inner.list_iceberg_tables(request).await?;
2038        Ok(resp.iceberg_tables)
2039    }
2040
2041    /// Get the await tree of all nodes in the cluster.
2042    pub async fn get_cluster_stack_trace(
2043        &self,
2044        actor_traces_format: ActorTracesFormat,
2045    ) -> Result<StackTraceResponse> {
2046        let request = StackTraceRequest {
2047            actor_traces_format: actor_traces_format as i32,
2048        };
2049        let resp = self.inner.stack_trace(request).await?;
2050        Ok(resp)
2051    }
2052
2053    pub async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
2054        let request = SetSyncLogStoreAlignedRequest { job_id, aligned };
2055        self.inner.set_sync_log_store_aligned(request).await?;
2056        Ok(())
2057    }
2058
2059    pub async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
2060        self.inner.refresh(request).await
2061    }
2062
2063    pub async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
2064        let request = ListUnmigratedTablesRequest {};
2065        let resp = self.inner.list_unmigrated_tables(request).await?;
2066        Ok(resp
2067            .tables
2068            .into_iter()
2069            .map(|table| (table.table_id, table.table_name))
2070            .collect())
2071    }
2072}
2073
2074#[async_trait]
2075impl HummockMetaClient for MetaClient {
2076    async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
2077        let req = UnpinVersionBeforeRequest {
2078            context_id: self.worker_id(),
2079            unpin_version_before,
2080        };
2081        self.inner.unpin_version_before(req).await?;
2082        Ok(())
2083    }
2084
2085    async fn get_current_version(&self) -> Result<HummockVersion> {
2086        let req = GetCurrentVersionRequest::default();
2087        Ok(HummockVersion::from_rpc_protobuf(
2088            &self
2089                .inner
2090                .get_current_version(req)
2091                .await?
2092                .current_version
2093                .unwrap(),
2094        ))
2095    }
2096
2097    async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
2098        let resp = self
2099            .inner
2100            .get_new_object_ids(GetNewObjectIdsRequest { number })
2101            .await?;
2102        Ok(ObjectIdRange::new(resp.start_id, resp.end_id))
2103    }
2104
2105    async fn commit_epoch_with_change_log(
2106        &self,
2107        _epoch: HummockEpoch,
2108        _sync_result: SyncResult,
2109        _change_log_info: Option<HummockMetaClientChangeLogInfo>,
2110    ) -> Result<()> {
2111        panic!("Only meta service can commit_epoch in production.")
2112    }
2113
2114    async fn trigger_manual_compaction(
2115        &self,
2116        compaction_group_id: CompactionGroupId,
2117        table_id: JobId,
2118        level: u32,
2119        target_level: Option<u32>,
2120        sst_ids: Vec<HummockSstableId>,
2121        exclusive: bool,
2122    ) -> Result<bool> {
2123        // TODO: support key_range parameter
2124        let req = TriggerManualCompactionRequest {
2125            compaction_group_id,
2126            table_id,
2127            // if table_id not exist, manual_compaction will include all the sst
2128            // without check internal_table_id
2129            level,
2130            target_level,
2131            sst_ids,
2132            exclusive: Some(exclusive),
2133            ..Default::default()
2134        };
2135
2136        let resp = self.inner.trigger_manual_compaction(req).await?;
2137        Ok(resp.should_retry.unwrap_or(false))
2138    }
2139
2140    async fn trigger_full_gc(
2141        &self,
2142        sst_retention_time_sec: u64,
2143        prefix: Option<String>,
2144    ) -> Result<()> {
2145        self.inner
2146            .trigger_full_gc(TriggerFullGcRequest {
2147                sst_retention_time_sec,
2148                prefix,
2149            })
2150            .await?;
2151        Ok(())
2152    }
2153
2154    async fn subscribe_compaction_event(
2155        &self,
2156    ) -> Result<(
2157        UnboundedSender<SubscribeCompactionEventRequest>,
2158        BoxStream<'static, CompactionEventItem>,
2159    )> {
2160        let (request_sender, request_receiver) =
2161            unbounded_channel::<SubscribeCompactionEventRequest>();
2162        request_sender
2163            .send(SubscribeCompactionEventRequest {
2164                event: Some(subscribe_compaction_event_request::Event::Register(
2165                    Register {
2166                        context_id: self.worker_id(),
2167                    },
2168                )),
2169                create_at: SystemTime::now()
2170                    .duration_since(SystemTime::UNIX_EPOCH)
2171                    .expect("Clock may have gone backwards")
2172                    .as_millis() as u64,
2173            })
2174            .context("Failed to subscribe compaction event")?;
2175
2176        let stream = self
2177            .inner
2178            .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
2179                request_receiver,
2180            )))
2181            .await?;
2182
2183        Ok((request_sender, Box::pin(stream)))
2184    }
2185
2186    async fn get_version_by_epoch(
2187        &self,
2188        epoch: HummockEpoch,
2189        table_id: TableId,
2190    ) -> Result<PbHummockVersion> {
2191        self.get_version_by_epoch(epoch, table_id).await
2192    }
2193
2194    async fn subscribe_iceberg_compaction_event(
2195        &self,
2196    ) -> Result<(
2197        UnboundedSender<SubscribeIcebergCompactionEventRequest>,
2198        BoxStream<'static, IcebergCompactionEventItem>,
2199    )> {
2200        let (request_sender, request_receiver) =
2201            unbounded_channel::<SubscribeIcebergCompactionEventRequest>();
2202        request_sender
2203            .send(SubscribeIcebergCompactionEventRequest {
2204                event: Some(subscribe_iceberg_compaction_event_request::Event::Register(
2205                    IcebergRegister {
2206                        context_id: self.worker_id(),
2207                    },
2208                )),
2209                create_at: SystemTime::now()
2210                    .duration_since(SystemTime::UNIX_EPOCH)
2211                    .expect("Clock may have gone backwards")
2212                    .as_millis() as u64,
2213            })
2214            .context("Failed to subscribe compaction event")?;
2215
2216        let stream = self
2217            .inner
2218            .subscribe_iceberg_compaction_event(Request::new(UnboundedReceiverStream::new(
2219                request_receiver,
2220            )))
2221            .await?;
2222
2223        Ok((request_sender, Box::pin(stream)))
2224    }
2225
2226    async fn get_table_change_logs(
2227        &self,
2228        epoch_only: bool,
2229        start_epoch_inclusive: Option<u64>,
2230        end_epoch_inclusive: Option<u64>,
2231        table_ids: Option<HashSet<TableId>>,
2232        exclude_empty: bool,
2233        limit: Option<u32>,
2234    ) -> Result<TableChangeLogs> {
2235        self.get_table_change_logs(
2236            epoch_only,
2237            start_epoch_inclusive,
2238            end_epoch_inclusive,
2239            table_ids,
2240            exclude_empty,
2241            limit,
2242        )
2243        .await
2244    }
2245}
2246
2247#[async_trait]
2248impl TelemetryInfoFetcher for MetaClient {
2249    async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
2250        let resp = self
2251            .get_telemetry_info()
2252            .await
2253            .map_err(|e| e.to_report_string())?;
2254        let tracking_id = resp.get_tracking_id().ok();
2255        Ok(tracking_id.map(|id| id.to_owned()))
2256    }
2257}
2258
2259pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
2260
2261#[derive(Debug, Clone)]
2262struct GrpcMetaClientCore {
2263    cluster_client: ClusterServiceClient<Channel>,
2264    meta_member_client: MetaMemberServiceClient<Channel>,
2265    heartbeat_client: HeartbeatServiceClient<Channel>,
2266    ddl_client: DdlServiceClient<Channel>,
2267    hummock_client: HummockManagerServiceClient<Channel>,
2268    notification_client: NotificationServiceClient<Channel>,
2269    stream_client: StreamManagerServiceClient<Channel>,
2270    user_client: UserServiceClient<Channel>,
2271    scale_client: ScaleServiceClient<Channel>,
2272    backup_client: BackupServiceClient<Channel>,
2273    telemetry_client: TelemetryInfoServiceClient<Channel>,
2274    system_params_client: SystemParamsServiceClient<Channel>,
2275    session_params_client: SessionParamServiceClient<Channel>,
2276    serving_client: ServingServiceClient<Channel>,
2277    cloud_client: CloudServiceClient<Channel>,
2278    sink_coordinate_client: SinkCoordinationRpcClient,
2279    event_log_client: EventLogServiceClient<Channel>,
2280    cluster_limit_client: ClusterLimitServiceClient<Channel>,
2281    hosted_iceberg_catalog_service_client: HostedIcebergCatalogServiceClient<Channel>,
2282    monitor_client: MonitorServiceClient<Channel>,
2283}
2284
2285impl GrpcMetaClientCore {
2286    pub(crate) fn new(channel: Channel) -> Self {
2287        let cluster_client = ClusterServiceClient::new(channel.clone());
2288        let meta_member_client = MetaMemberClient::new(channel.clone());
2289        let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
2290        let ddl_client =
2291            DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2292        let hummock_client =
2293            HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2294        let notification_client =
2295            NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2296        let stream_client =
2297            StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2298        let user_client = UserServiceClient::new(channel.clone());
2299        let scale_client =
2300            ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2301        let backup_client = BackupServiceClient::new(channel.clone());
2302        let telemetry_client =
2303            TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2304        let system_params_client = SystemParamsServiceClient::new(channel.clone());
2305        let session_params_client = SessionParamServiceClient::new(channel.clone());
2306        let serving_client = ServingServiceClient::new(channel.clone());
2307        let cloud_client = CloudServiceClient::new(channel.clone());
2308        let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone())
2309            .max_decoding_message_size(usize::MAX);
2310        let event_log_client = EventLogServiceClient::new(channel.clone());
2311        let cluster_limit_client = ClusterLimitServiceClient::new(channel.clone());
2312        let hosted_iceberg_catalog_service_client =
2313            HostedIcebergCatalogServiceClient::new(channel.clone());
2314        let monitor_client = configured_monitor_service_client(MonitorServiceClient::new(channel));
2315
2316        GrpcMetaClientCore {
2317            cluster_client,
2318            meta_member_client,
2319            heartbeat_client,
2320            ddl_client,
2321            hummock_client,
2322            notification_client,
2323            stream_client,
2324            user_client,
2325            scale_client,
2326            backup_client,
2327            telemetry_client,
2328            system_params_client,
2329            session_params_client,
2330            serving_client,
2331            cloud_client,
2332            sink_coordinate_client,
2333            event_log_client,
2334            cluster_limit_client,
2335            hosted_iceberg_catalog_service_client,
2336            monitor_client,
2337        }
2338    }
2339}
2340
2341/// Client to meta server. Cloning the instance is lightweight.
2342///
2343/// It is a wrapper of tonic client. See [`crate::meta_rpc_client_method_impl`].
2344#[derive(Debug, Clone)]
2345struct GrpcMetaClient {
2346    member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
2347    core: Arc<RwLock<GrpcMetaClientCore>>,
2348}
2349
2350type MetaMemberClient = MetaMemberServiceClient<Channel>;
2351
2352struct MetaMemberGroup {
2353    members: LruCache<http::Uri, Option<MetaMemberClient>>,
2354}
2355
2356struct MetaMemberManagement {
2357    core_ref: Arc<RwLock<GrpcMetaClientCore>>,
2358    members: Either<MetaMemberClient, MetaMemberGroup>,
2359    current_leader: http::Uri,
2360    meta_config: Arc<MetaConfig>,
2361}
2362
2363impl MetaMemberManagement {
2364    const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
2365
2366    fn host_address_to_uri(addr: HostAddress) -> http::Uri {
2367        format!("http://{}:{}", addr.host, addr.port)
2368            .parse()
2369            .unwrap()
2370    }
2371
2372    async fn recreate_core(&self, channel: Channel) {
2373        let mut core = self.core_ref.write().await;
2374        *core = GrpcMetaClientCore::new(channel);
2375    }
2376
2377    async fn refresh_members(&mut self) -> Result<()> {
2378        let leader_addr = match self.members.as_mut() {
2379            Either::Left(client) => {
2380                let resp = client
2381                    .to_owned()
2382                    .members(MembersRequest {})
2383                    .await
2384                    .map_err(RpcError::from_meta_status)?;
2385                let resp = resp.into_inner();
2386                resp.members.into_iter().find(|member| member.is_leader)
2387            }
2388            Either::Right(member_group) => {
2389                let mut fetched_members = None;
2390
2391                for (addr, client) in &mut member_group.members {
2392                    let members: Result<_> = try {
2393                        let mut client = match client {
2394                            Some(cached_client) => cached_client.to_owned(),
2395                            None => {
2396                                let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
2397                                let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
2398                                    .await
2399                                    .context("failed to create client")
2400                                    .map_err(RpcError::from)?;
2401                                let new_client: MetaMemberClient =
2402                                    MetaMemberServiceClient::new(channel);
2403                                *client = Some(new_client.clone());
2404
2405                                new_client
2406                            }
2407                        };
2408
2409                        let resp = client
2410                            .members(MembersRequest {})
2411                            .await
2412                            .context("failed to fetch members")
2413                            .map_err(RpcError::from)?;
2414
2415                        resp.into_inner().members
2416                    };
2417
2418                    let fetched = members.is_ok();
2419                    fetched_members = Some(members);
2420                    if fetched {
2421                        break;
2422                    }
2423                }
2424
2425                let members = fetched_members
2426                    .context("no member available in the list")?
2427                    .context("could not refresh members")?;
2428
2429                // find new leader
2430                let mut leader = None;
2431                for member in members {
2432                    if member.is_leader {
2433                        leader = Some(member.clone());
2434                    }
2435
2436                    let addr = Self::host_address_to_uri(member.address.unwrap());
2437                    // We don't clean any expired addrs here to deal with some extreme situations.
2438                    if !member_group.members.contains(&addr) {
2439                        tracing::info!("new meta member joined: {}", addr);
2440                        member_group.members.put(addr, None);
2441                    }
2442                }
2443
2444                leader
2445            }
2446        };
2447
2448        if let Some(leader) = leader_addr {
2449            let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
2450
2451            if discovered_leader != self.current_leader {
2452                tracing::info!("new meta leader {} discovered", discovered_leader);
2453
2454                let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
2455                    Duration::from_secs(self.meta_config.meta_leader_lease_secs),
2456                    false,
2457                );
2458
2459                let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
2460                    let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
2461                    GrpcMetaClient::connect_to_endpoint(endpoint).await
2462                })
2463                .await?;
2464
2465                self.recreate_core(channel).await;
2466                self.current_leader = discovered_leader;
2467            }
2468        }
2469
2470        Ok(())
2471    }
2472}
2473
2474impl GrpcMetaClient {
2475    // See `Endpoint::http2_keep_alive_interval`
2476    const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
2477    // See `Endpoint::keep_alive_timeout`
2478    const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
2479    // Retry base interval in ms for connecting to meta server.
2480    const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
2481    // Max retry times for connecting to meta server.
2482    const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
2483
2484    fn start_meta_member_monitor(
2485        &self,
2486        init_leader_addr: http::Uri,
2487        members: Either<MetaMemberClient, MetaMemberGroup>,
2488        force_refresh_receiver: Receiver<Sender<Result<()>>>,
2489        meta_config: Arc<MetaConfig>,
2490    ) -> Result<()> {
2491        let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
2492        let current_leader = init_leader_addr;
2493
2494        let enable_period_tick = matches!(members, Either::Right(_));
2495
2496        let member_management = MetaMemberManagement {
2497            core_ref,
2498            members,
2499            current_leader,
2500            meta_config,
2501        };
2502
2503        let mut force_refresh_receiver = force_refresh_receiver;
2504
2505        tokio::spawn(async move {
2506            let mut member_management = member_management;
2507            let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
2508
2509            loop {
2510                let event: Option<Sender<Result<()>>> = if enable_period_tick {
2511                    tokio::select! {
2512                        _ = ticker.tick() => None,
2513                        result_sender = force_refresh_receiver.recv() => {
2514                            if result_sender.is_none() {
2515                                break;
2516                            }
2517
2518                            result_sender
2519                        },
2520                    }
2521                } else {
2522                    let result_sender = force_refresh_receiver.recv().await;
2523
2524                    if result_sender.is_none() {
2525                        break;
2526                    }
2527
2528                    result_sender
2529                };
2530
2531                let tick_result = member_management.refresh_members().await;
2532                if let Err(e) = tick_result.as_ref() {
2533                    tracing::warn!(error = %e.as_report(),  "refresh meta member client failed");
2534                }
2535
2536                if let Some(sender) = event {
2537                    // ignore resp
2538                    let _resp = sender.send(tick_result);
2539                }
2540            }
2541        });
2542
2543        Ok(())
2544    }
2545
2546    async fn force_refresh_leader(&self) -> Result<()> {
2547        let (sender, receiver) = oneshot::channel();
2548
2549        self.member_monitor_event_sender
2550            .send(sender)
2551            .await
2552            .map_err(|e| anyhow!(e))?;
2553
2554        receiver.await.map_err(|e| anyhow!(e))?
2555    }
2556
2557    /// Connect to the meta server from `addrs`.
2558    pub async fn new(strategy: &MetaAddressStrategy, config: Arc<MetaConfig>) -> Result<Self> {
2559        let (channel, addr) = match strategy {
2560            MetaAddressStrategy::LoadBalance(addr) => {
2561                Self::try_build_rpc_channel(vec![addr.clone()]).await
2562            }
2563            MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
2564        }?;
2565        let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
2566        let client = GrpcMetaClient {
2567            member_monitor_event_sender: force_refresh_sender,
2568            core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
2569        };
2570
2571        let meta_member_client = client.core.read().await.meta_member_client.clone();
2572        let members = match strategy {
2573            MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
2574            MetaAddressStrategy::List(addrs) => {
2575                let mut members = LruCache::new(20.try_into().unwrap());
2576                for addr in addrs {
2577                    members.put(addr.clone(), None);
2578                }
2579                members.put(addr.clone(), Some(meta_member_client));
2580
2581                Either::Right(MetaMemberGroup { members })
2582            }
2583        };
2584
2585        client.start_meta_member_monitor(addr, members, force_refresh_receiver, config.clone())?;
2586
2587        client.force_refresh_leader().await?;
2588
2589        Ok(client)
2590    }
2591
2592    fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2593        Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2594    }
2595
2596    pub(crate) async fn try_build_rpc_channel(
2597        addrs: impl IntoIterator<Item = http::Uri>,
2598    ) -> Result<(Channel, http::Uri)> {
2599        let endpoints: Vec<_> = addrs
2600            .into_iter()
2601            .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2602            .collect();
2603
2604        let mut last_error = None;
2605
2606        for (endpoint, addr) in endpoints {
2607            match Self::connect_to_endpoint(endpoint).await {
2608                Ok(channel) => {
2609                    tracing::info!("Connect to meta server {} successfully", addr);
2610                    return Ok((channel, addr));
2611                }
2612                Err(e) => {
2613                    tracing::warn!(
2614                        error = %e.as_report(),
2615                        "Failed to connect to meta server {}, trying again",
2616                        addr,
2617                    );
2618                    last_error = Some(e);
2619                }
2620            }
2621        }
2622
2623        if let Some(last_error) = last_error {
2624            Err(anyhow::anyhow!(last_error)
2625                .context("failed to connect to all meta servers")
2626                .into())
2627        } else {
2628            bail!("no meta server address provided")
2629        }
2630    }
2631
2632    async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2633        let channel = endpoint
2634            .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2635            .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2636            .connect_timeout(Duration::from_secs(5))
2637            .monitored_connect("grpc-meta-client", Default::default())
2638            .await?
2639            .wrapped();
2640
2641        Ok(channel)
2642    }
2643
2644    pub(crate) fn retry_strategy_to_bound(
2645        high_bound: Duration,
2646        exceed: bool,
2647    ) -> impl Iterator<Item = Duration> {
2648        let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2649            .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2650            .map(jitter);
2651
2652        let mut sum = Duration::default();
2653
2654        iter.take_while(move |duration| {
2655            sum += *duration;
2656
2657            if exceed {
2658                sum < high_bound + *duration
2659            } else {
2660                sum < high_bound
2661            }
2662        })
2663    }
2664}
2665
2666macro_rules! for_all_meta_rpc {
2667    ($macro:ident) => {
2668        $macro! {
2669             { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2670            ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2671            ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2672            ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2673            ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2674            ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2675            ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2676            ,{ stream_client, flush, FlushRequest, FlushResponse }
2677            ,{ stream_client, pause, PauseRequest, PauseResponse }
2678            ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2679            ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2680            ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2681            ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2682            ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2683            ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2684            ,{ stream_client, list_creating_fragment_distribution, ListCreatingFragmentDistributionRequest, ListCreatingFragmentDistributionResponse }
2685            ,{ stream_client, list_sink_log_store_tables, ListSinkLogStoreTablesRequest, ListSinkLogStoreTablesResponse }
2686            ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2687            ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2688            ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2689            ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2690            ,{ stream_client, list_cdc_progress, ListCdcProgressRequest, ListCdcProgressResponse }
2691            ,{ stream_client, list_refresh_table_states, ListRefreshTableStatesRequest, ListRefreshTableStatesResponse }
2692            ,{ stream_client, list_iceberg_compaction_status, ListIcebergCompactionStatusRequest, ListIcebergCompactionStatusResponse }
2693            ,{ stream_client, alter_connector_props, AlterConnectorPropsRequest, AlterConnectorPropsResponse }
2694            ,{ stream_client, alter_source_properties_safe, AlterSourcePropertiesSafeRequest, AlterSourcePropertiesSafeResponse }
2695            ,{ stream_client, reset_source_splits, ResetSourceSplitsRequest, ResetSourceSplitsResponse }
2696            ,{ stream_client, inject_source_offsets, InjectSourceOffsetsRequest, InjectSourceOffsetsResponse }
2697            ,{ stream_client, get_fragment_by_id, GetFragmentByIdRequest, GetFragmentByIdResponse }
2698            ,{ stream_client, get_fragment_vnodes, GetFragmentVnodesRequest, GetFragmentVnodesResponse }
2699            ,{ stream_client, get_actor_vnodes, GetActorVnodesRequest, GetActorVnodesResponse }
2700            ,{ stream_client, set_sync_log_store_aligned, SetSyncLogStoreAlignedRequest, SetSyncLogStoreAlignedResponse }
2701            ,{ stream_client, refresh, RefreshRequest, RefreshResponse }
2702            ,{ stream_client, list_unmigrated_tables, ListUnmigratedTablesRequest, ListUnmigratedTablesResponse }
2703            ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2704            ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2705            ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2706            ,{ ddl_client, alter_subscription_retention, AlterSubscriptionRetentionRequest, AlterSubscriptionRetentionResponse }
2707            ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2708            ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2709            ,{ ddl_client, alter_backfill_parallelism, AlterBackfillParallelismRequest, AlterBackfillParallelismResponse }
2710            ,{ ddl_client, alter_streaming_job_config, AlterStreamingJobConfigRequest, AlterStreamingJobConfigResponse }
2711            ,{ ddl_client, alter_fragment_parallelism, AlterFragmentParallelismRequest, AlterFragmentParallelismResponse }
2712            ,{ ddl_client, alter_cdc_table_backfill_parallelism, AlterCdcTableBackfillParallelismRequest, AlterCdcTableBackfillParallelismResponse }
2713            ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2714            ,{ ddl_client, alter_database_resource_group, AlterDatabaseResourceGroupRequest, AlterDatabaseResourceGroupResponse }
2715            ,{ ddl_client, alter_database_param, AlterDatabaseParamRequest, AlterDatabaseParamResponse }
2716            ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2717            ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2718            ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2719            ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2720            ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2721            ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2722            ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2723            ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2724            ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2725            ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2726            ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2727            ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2728            ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2729            ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2730            ,{ ddl_client, reset_source, ResetSourceRequest, ResetSourceResponse }
2731            ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2732            ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2733            ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2734            ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2735            ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2736            ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2737            ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2738            ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2739            ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2740            ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2741            ,{ ddl_client, risectl_resume_backfill, RisectlResumeBackfillRequest, RisectlResumeBackfillResponse }
2742            ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2743            ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2744            ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2745            ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2746            ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2747            ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2748            ,{ ddl_client, wait, WaitRequest, WaitResponse }
2749            ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2750            ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2751            ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2752            ,{ ddl_client, compact_iceberg_table, CompactIcebergTableRequest, CompactIcebergTableResponse }
2753            ,{ ddl_client, expire_iceberg_table_snapshots, ExpireIcebergTableSnapshotsRequest, ExpireIcebergTableSnapshotsResponse }
2754            ,{ ddl_client, create_iceberg_table, CreateIcebergTableRequest, CreateIcebergTableResponse }
2755            ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2756            ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2757            ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2758            ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2759            ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2760            ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2761            ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2762            ,{ hummock_client, get_new_object_ids, GetNewObjectIdsRequest, GetNewObjectIdsResponse }
2763            ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2764            ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2765            ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2766            ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2767            ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2768            ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2769            ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2770            ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2771            ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2772            ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2773            ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2774            ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2775            ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2776            ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2777            ,{ hummock_client, subscribe_iceberg_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeIcebergCompactionEventRequest>, Streaming<SubscribeIcebergCompactionEventResponse> }
2778            ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2779            ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2780            ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2781            ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2782            ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2783            ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2784            ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2785            ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2786            ,{ hummock_client, get_table_change_logs, GetTableChangeLogsRequest, GetTableChangeLogsResponse }
2787            ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2788            ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2789            ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2790            ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2791            ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2792            ,{ user_client, alter_default_privilege, AlterDefaultPrivilegeRequest, AlterDefaultPrivilegeResponse }
2793            ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2794            ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2795            ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2796            ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2797            ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2798            ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2799            ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2800            ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2801            ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2802            ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2803            ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2804            ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2805            ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2806            ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2807            ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2808            ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2809            ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2810            ,{ hosted_iceberg_catalog_service_client, list_iceberg_tables, ListIcebergTablesRequest, ListIcebergTablesResponse }
2811            ,{ monitor_client, stack_trace, StackTraceRequest, StackTraceResponse }
2812        }
2813    };
2814}
2815
2816impl GrpcMetaClient {
2817    async fn refresh_client_if_needed(&self, code: Code) {
2818        if matches!(
2819            code,
2820            Code::Unknown | Code::Unimplemented | Code::Unavailable
2821        ) {
2822            tracing::debug!("matching tonic code {}", code);
2823            let (result_sender, result_receiver) = oneshot::channel();
2824            if self
2825                .member_monitor_event_sender
2826                .try_send(result_sender)
2827                .is_ok()
2828            {
2829                if let Ok(Err(e)) = result_receiver.await {
2830                    tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2831                }
2832            } else {
2833                tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2834            }
2835        }
2836    }
2837}
2838
2839impl GrpcMetaClient {
2840    for_all_meta_rpc! { meta_rpc_client_method_impl }
2841}