risingwave_rpc_client/
meta_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{
33    AlterDatabaseParam, FunctionId, IndexId, ObjectId, SecretId, TableId,
34};
35use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
36use risingwave_common::hash::WorkerSlotMapping;
37use risingwave_common::monitor::EndpointExt;
38use risingwave_common::system_param::reader::SystemParamsReader;
39use risingwave_common::telemetry::report::TelemetryInfoFetcher;
40use risingwave_common::util::addr::HostAddr;
41use risingwave_common::util::meta_addr::MetaAddressStrategy;
42use risingwave_common::util::resource_util::cpu::total_cpu_available;
43use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
44use risingwave_error::bail;
45use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
46use risingwave_hummock_sdk::compaction_group::StateTableId;
47use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
48use risingwave_hummock_sdk::{
49    CompactionGroupId, HummockEpoch, HummockVersionId, ObjectIdRange, SyncResult,
50};
51use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
52use risingwave_pb::backup_service::*;
53use risingwave_pb::catalog::{
54    Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
55    PbSubscription, PbTable, PbView, Table,
56};
57use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
58use risingwave_pb::cloud_service::*;
59use risingwave_pb::common::worker_node::Property;
60use risingwave_pb::common::{HostAddress, OptionalUint32, OptionalUint64, WorkerNode, WorkerType};
61use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
62use risingwave_pb::ddl_service::alter_owner_request::Object;
63use risingwave_pb::ddl_service::create_materialized_view_request::PbBackfillType;
64use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
65use risingwave_pb::ddl_service::drop_table_request::SourceId;
66use risingwave_pb::ddl_service::*;
67use risingwave_pb::hummock::compact_task::TaskStatus;
68use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
69use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
70use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
71use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
72use risingwave_pb::hummock::write_limits::WriteLimit;
73use risingwave_pb::hummock::*;
74use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Register as IcebergRegister;
75use risingwave_pb::iceberg_compaction::{
76    SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
77    subscribe_iceberg_compaction_event_request,
78};
79use risingwave_pb::meta::alter_connector_props_request::AlterConnectorPropsObject;
80use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
81use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
82use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
83use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
84use risingwave_pb::meta::hosted_iceberg_catalog_service_client::HostedIcebergCatalogServiceClient;
85use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
86use risingwave_pb::meta::list_actor_states_response::ActorState;
87use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
88use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
89use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
90use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
91use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
92use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
93use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
94use risingwave_pb::meta::serving_service_client::ServingServiceClient;
95use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
96use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
97use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
98use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
99use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
100use risingwave_pb::meta::{FragmentDistribution, *};
101use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
102use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
103use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
104use risingwave_pb::secret::PbSecretRef;
105use risingwave_pb::stream_plan::StreamFragmentGraph;
106use risingwave_pb::user::update_user_request::UpdateField;
107use risingwave_pb::user::user_service_client::UserServiceClient;
108use risingwave_pb::user::*;
109use thiserror_ext::AsReport;
110use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
111use tokio::sync::oneshot::Sender;
112use tokio::sync::{RwLock, mpsc, oneshot};
113use tokio::task::JoinHandle;
114use tokio::time::{self};
115use tokio_retry::strategy::{ExponentialBackoff, jitter};
116use tokio_stream::wrappers::UnboundedReceiverStream;
117use tonic::transport::Endpoint;
118use tonic::{Code, Request, Streaming};
119
120use crate::channel::{Channel, WrappedChannelExt};
121use crate::error::{Result, RpcError};
122use crate::hummock_meta_client::{
123    CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
124    IcebergCompactionEventItem,
125};
126use crate::meta_rpc_client_method_impl;
127
128type ConnectionId = u32;
129type DatabaseId = u32;
130type SchemaId = u32;
131
132/// Client to meta server. Cloning the instance is lightweight.
133#[derive(Clone, Debug)]
134pub struct MetaClient {
135    worker_id: u32,
136    worker_type: WorkerType,
137    host_addr: HostAddr,
138    inner: GrpcMetaClient,
139    meta_config: MetaConfig,
140    cluster_id: String,
141    shutting_down: Arc<AtomicBool>,
142}
143
144impl MetaClient {
145    pub fn worker_id(&self) -> u32 {
146        self.worker_id
147    }
148
149    pub fn host_addr(&self) -> &HostAddr {
150        &self.host_addr
151    }
152
153    pub fn worker_type(&self) -> WorkerType {
154        self.worker_type
155    }
156
157    pub fn cluster_id(&self) -> &str {
158        &self.cluster_id
159    }
160
161    /// Subscribe to notification from meta.
162    pub async fn subscribe(
163        &self,
164        subscribe_type: SubscribeType,
165    ) -> Result<Streaming<SubscribeResponse>> {
166        let request = SubscribeRequest {
167            subscribe_type: subscribe_type as i32,
168            host: Some(self.host_addr.to_protobuf()),
169            worker_id: self.worker_id(),
170        };
171
172        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
173            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
174            true,
175        );
176
177        tokio_retry::Retry::spawn(retry_strategy, || async {
178            let request = request.clone();
179            self.inner.subscribe(request).await
180        })
181        .await
182    }
183
184    pub async fn create_connection(
185        &self,
186        connection_name: String,
187        database_id: u32,
188        schema_id: u32,
189        owner_id: u32,
190        req: create_connection_request::Payload,
191    ) -> Result<WaitVersion> {
192        let request = CreateConnectionRequest {
193            name: connection_name,
194            database_id,
195            schema_id,
196            owner_id,
197            payload: Some(req),
198        };
199        let resp = self.inner.create_connection(request).await?;
200        Ok(resp
201            .version
202            .ok_or_else(|| anyhow!("wait version not set"))?)
203    }
204
205    pub async fn create_secret(
206        &self,
207        secret_name: String,
208        database_id: u32,
209        schema_id: u32,
210        owner_id: u32,
211        value: Vec<u8>,
212    ) -> Result<WaitVersion> {
213        let request = CreateSecretRequest {
214            name: secret_name,
215            database_id,
216            schema_id,
217            owner_id,
218            value,
219        };
220        let resp = self.inner.create_secret(request).await?;
221        Ok(resp
222            .version
223            .ok_or_else(|| anyhow!("wait version not set"))?)
224    }
225
226    pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
227        let request = ListConnectionsRequest {};
228        let resp = self.inner.list_connections(request).await?;
229        Ok(resp.connections)
230    }
231
232    pub async fn drop_connection(&self, connection_id: ConnectionId) -> Result<WaitVersion> {
233        let request = DropConnectionRequest { connection_id };
234        let resp = self.inner.drop_connection(request).await?;
235        Ok(resp
236            .version
237            .ok_or_else(|| anyhow!("wait version not set"))?)
238    }
239
240    pub async fn drop_secret(&self, secret_id: SecretId) -> Result<WaitVersion> {
241        let request = DropSecretRequest {
242            secret_id: secret_id.into(),
243        };
244        let resp = self.inner.drop_secret(request).await?;
245        Ok(resp
246            .version
247            .ok_or_else(|| anyhow!("wait version not set"))?)
248    }
249
250    /// Register the current node to the cluster and set the corresponding worker id.
251    ///
252    /// Retry if there's connection issue with the meta node. Exit the process if the registration fails.
253    pub async fn register_new(
254        addr_strategy: MetaAddressStrategy,
255        worker_type: WorkerType,
256        addr: &HostAddr,
257        property: Property,
258        meta_config: &MetaConfig,
259    ) -> (Self, SystemParamsReader) {
260        let ret =
261            Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
262
263        match ret {
264            Ok(ret) => ret,
265            Err(err) => {
266                tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
267                std::process::exit(1);
268            }
269        }
270    }
271
272    async fn register_new_inner(
273        addr_strategy: MetaAddressStrategy,
274        worker_type: WorkerType,
275        addr: &HostAddr,
276        property: Property,
277        meta_config: &MetaConfig,
278    ) -> Result<(Self, SystemParamsReader)> {
279        tracing::info!("register meta client using strategy: {}", addr_strategy);
280
281        // Retry until reaching `max_heartbeat_interval_secs`
282        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
283            Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
284            true,
285        );
286
287        if property.is_unschedulable {
288            tracing::warn!("worker {:?} registered as unschedulable", addr.clone());
289        }
290        let init_result: Result<_> = tokio_retry::RetryIf::spawn(
291            retry_strategy,
292            || async {
293                let grpc_meta_client =
294                    GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
295
296                let add_worker_resp = grpc_meta_client
297                    .add_worker_node(AddWorkerNodeRequest {
298                        worker_type: worker_type as i32,
299                        host: Some(addr.to_protobuf()),
300                        property: Some(property.clone()),
301                        resource: Some(risingwave_pb::common::worker_node::Resource {
302                            rw_version: RW_VERSION.to_owned(),
303                            total_memory_bytes: system_memory_available_bytes() as _,
304                            total_cpu_cores: total_cpu_available() as _,
305                        }),
306                    })
307                    .await
308                    .context("failed to add worker node")?;
309
310                let system_params_resp = grpc_meta_client
311                    .get_system_params(GetSystemParamsRequest {})
312                    .await
313                    .context("failed to get initial system params")?;
314
315                Ok((add_worker_resp, system_params_resp, grpc_meta_client))
316            },
317            // Only retry if there's any transient connection issue.
318            // If the error is from our implementation or business, do not retry it.
319            |e: &RpcError| !e.is_from_tonic_server_impl(),
320        )
321        .await;
322
323        let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
324        let worker_id = add_worker_resp
325            .node_id
326            .expect("AddWorkerNodeResponse::node_id is empty");
327
328        let meta_client = Self {
329            worker_id,
330            worker_type,
331            host_addr: addr.clone(),
332            inner: grpc_meta_client,
333            meta_config: meta_config.to_owned(),
334            cluster_id: add_worker_resp.cluster_id,
335            shutting_down: Arc::new(false.into()),
336        };
337
338        static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
339        REPORT_PANIC.call_once(|| {
340            let meta_client_clone = meta_client.clone();
341            std::panic::update_hook(move |default_hook, info| {
342                // Try to report panic event to meta node.
343                meta_client_clone.try_add_panic_event_blocking(info, None);
344                default_hook(info);
345            });
346        });
347
348        Ok((meta_client, system_params_resp.params.unwrap().into()))
349    }
350
351    /// Activate the current node in cluster to confirm it's ready to serve.
352    pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
353        let request = ActivateWorkerNodeRequest {
354            host: Some(addr.to_protobuf()),
355            node_id: self.worker_id,
356        };
357        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
358            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
359            true,
360        );
361        tokio_retry::Retry::spawn(retry_strategy, || async {
362            let request = request.clone();
363            self.inner.activate_worker_node(request).await
364        })
365        .await?;
366
367        Ok(())
368    }
369
370    /// Send heartbeat signal to meta service.
371    pub async fn send_heartbeat(&self, node_id: u32) -> Result<()> {
372        let request = HeartbeatRequest { node_id };
373        let resp = self.inner.heartbeat(request).await?;
374        if let Some(status) = resp.status {
375            if status.code() == risingwave_pb::common::status::Code::UnknownWorker {
376                // Ignore the error if we're already shutting down.
377                // Otherwise, exit the process.
378                if !self.shutting_down.load(Relaxed) {
379                    tracing::error!(message = status.message, "worker expired");
380                    std::process::exit(1);
381                }
382            }
383        }
384        Ok(())
385    }
386
387    pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
388        let request = CreateDatabaseRequest { db: Some(db) };
389        let resp = self.inner.create_database(request).await?;
390        // TODO: handle error in `resp.status` here
391        Ok(resp
392            .version
393            .ok_or_else(|| anyhow!("wait version not set"))?)
394    }
395
396    pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
397        let request = CreateSchemaRequest {
398            schema: Some(schema),
399        };
400        let resp = self.inner.create_schema(request).await?;
401        // TODO: handle error in `resp.status` here
402        Ok(resp
403            .version
404            .ok_or_else(|| anyhow!("wait version not set"))?)
405    }
406
407    pub async fn create_materialized_view(
408        &self,
409        table: PbTable,
410        graph: StreamFragmentGraph,
411        dependencies: HashSet<ObjectId>,
412        specific_resource_group: Option<String>,
413        if_not_exists: bool,
414    ) -> Result<WaitVersion> {
415        let request = CreateMaterializedViewRequest {
416            materialized_view: Some(table),
417            fragment_graph: Some(graph),
418            backfill: PbBackfillType::Regular as _,
419            dependencies: dependencies.into_iter().collect(),
420            specific_resource_group,
421            if_not_exists,
422        };
423        let resp = self.inner.create_materialized_view(request).await?;
424        // TODO: handle error in `resp.status` here
425        Ok(resp
426            .version
427            .ok_or_else(|| anyhow!("wait version not set"))?)
428    }
429
430    pub async fn drop_materialized_view(
431        &self,
432        table_id: TableId,
433        cascade: bool,
434    ) -> Result<WaitVersion> {
435        let request = DropMaterializedViewRequest {
436            table_id: table_id.table_id(),
437            cascade,
438        };
439
440        let resp = self.inner.drop_materialized_view(request).await?;
441        Ok(resp
442            .version
443            .ok_or_else(|| anyhow!("wait version not set"))?)
444    }
445
446    pub async fn create_source(
447        &self,
448        source: PbSource,
449        graph: Option<StreamFragmentGraph>,
450        if_not_exists: bool,
451    ) -> Result<WaitVersion> {
452        let request = CreateSourceRequest {
453            source: Some(source),
454            fragment_graph: graph,
455            if_not_exists,
456        };
457
458        let resp = self.inner.create_source(request).await?;
459        Ok(resp
460            .version
461            .ok_or_else(|| anyhow!("wait version not set"))?)
462    }
463
464    pub async fn create_sink(
465        &self,
466        sink: PbSink,
467        graph: StreamFragmentGraph,
468        affected_table_change: Option<ReplaceJobPlan>,
469        dependencies: HashSet<ObjectId>,
470        if_not_exists: bool,
471    ) -> Result<WaitVersion> {
472        let request = CreateSinkRequest {
473            sink: Some(sink),
474            fragment_graph: Some(graph),
475            affected_table_change,
476            dependencies: dependencies.into_iter().collect(),
477            if_not_exists,
478        };
479
480        let resp = self.inner.create_sink(request).await?;
481        Ok(resp
482            .version
483            .ok_or_else(|| anyhow!("wait version not set"))?)
484    }
485
486    pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
487        let request = CreateSubscriptionRequest {
488            subscription: Some(subscription),
489        };
490
491        let resp = self.inner.create_subscription(request).await?;
492        Ok(resp
493            .version
494            .ok_or_else(|| anyhow!("wait version not set"))?)
495    }
496
497    pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
498        let request = CreateFunctionRequest {
499            function: Some(function),
500        };
501        let resp = self.inner.create_function(request).await?;
502        Ok(resp
503            .version
504            .ok_or_else(|| anyhow!("wait version not set"))?)
505    }
506
507    pub async fn create_table(
508        &self,
509        source: Option<PbSource>,
510        table: PbTable,
511        graph: StreamFragmentGraph,
512        job_type: PbTableJobType,
513        if_not_exists: bool,
514    ) -> Result<WaitVersion> {
515        let request = CreateTableRequest {
516            materialized_view: Some(table),
517            fragment_graph: Some(graph),
518            source,
519            job_type: job_type as _,
520            if_not_exists,
521        };
522        let resp = self.inner.create_table(request).await?;
523        // TODO: handle error in `resp.status` here
524        Ok(resp
525            .version
526            .ok_or_else(|| anyhow!("wait version not set"))?)
527    }
528
529    pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
530        let request = CommentOnRequest {
531            comment: Some(comment),
532        };
533        let resp = self.inner.comment_on(request).await?;
534        Ok(resp
535            .version
536            .ok_or_else(|| anyhow!("wait version not set"))?)
537    }
538
539    pub async fn alter_name(
540        &self,
541        object: alter_name_request::Object,
542        name: &str,
543    ) -> Result<WaitVersion> {
544        let request = AlterNameRequest {
545            object: Some(object),
546            new_name: name.to_owned(),
547        };
548        let resp = self.inner.alter_name(request).await?;
549        Ok(resp
550            .version
551            .ok_or_else(|| anyhow!("wait version not set"))?)
552    }
553
554    pub async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<WaitVersion> {
555        let request = AlterOwnerRequest {
556            object: Some(object),
557            owner_id,
558        };
559        let resp = self.inner.alter_owner(request).await?;
560        Ok(resp
561            .version
562            .ok_or_else(|| anyhow!("wait version not set"))?)
563    }
564
565    pub async fn alter_database_param(
566        &self,
567        database_id: DatabaseId,
568        param: AlterDatabaseParam,
569    ) -> Result<WaitVersion> {
570        let request = match param {
571            AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms) => {
572                let barrier_interval_ms = OptionalUint32 {
573                    value: barrier_interval_ms,
574                };
575                AlterDatabaseParamRequest {
576                    database_id,
577                    param: Some(alter_database_param_request::Param::BarrierIntervalMs(
578                        barrier_interval_ms,
579                    )),
580                }
581            }
582            AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency) => {
583                let checkpoint_frequency = OptionalUint64 {
584                    value: checkpoint_frequency,
585                };
586                AlterDatabaseParamRequest {
587                    database_id,
588                    param: Some(alter_database_param_request::Param::CheckpointFrequency(
589                        checkpoint_frequency,
590                    )),
591                }
592            }
593        };
594        let resp = self.inner.alter_database_param(request).await?;
595        Ok(resp
596            .version
597            .ok_or_else(|| anyhow!("wait version not set"))?)
598    }
599
600    pub async fn alter_set_schema(
601        &self,
602        object: alter_set_schema_request::Object,
603        new_schema_id: u32,
604    ) -> Result<WaitVersion> {
605        let request = AlterSetSchemaRequest {
606            new_schema_id,
607            object: Some(object),
608        };
609        let resp = self.inner.alter_set_schema(request).await?;
610        Ok(resp
611            .version
612            .ok_or_else(|| anyhow!("wait version not set"))?)
613    }
614
615    pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
616        let request = AlterSourceRequest {
617            source: Some(source),
618        };
619        let resp = self.inner.alter_source(request).await?;
620        Ok(resp
621            .version
622            .ok_or_else(|| anyhow!("wait version not set"))?)
623    }
624
625    pub async fn alter_parallelism(
626        &self,
627        table_id: u32,
628        parallelism: PbTableParallelism,
629        deferred: bool,
630    ) -> Result<()> {
631        let request = AlterParallelismRequest {
632            table_id,
633            parallelism: Some(parallelism),
634            deferred,
635        };
636
637        self.inner.alter_parallelism(request).await?;
638        Ok(())
639    }
640
641    pub async fn alter_resource_group(
642        &self,
643        table_id: u32,
644        resource_group: Option<String>,
645        deferred: bool,
646    ) -> Result<()> {
647        let request = AlterResourceGroupRequest {
648            table_id,
649            resource_group,
650            deferred,
651        };
652
653        self.inner.alter_resource_group(request).await?;
654        Ok(())
655    }
656
657    pub async fn alter_swap_rename(
658        &self,
659        object: alter_swap_rename_request::Object,
660    ) -> Result<WaitVersion> {
661        let request = AlterSwapRenameRequest {
662            object: Some(object),
663        };
664        let resp = self.inner.alter_swap_rename(request).await?;
665        Ok(resp
666            .version
667            .ok_or_else(|| anyhow!("wait version not set"))?)
668    }
669
670    pub async fn alter_secret(
671        &self,
672        secret_id: u32,
673        secret_name: String,
674        database_id: u32,
675        schema_id: u32,
676        owner_id: u32,
677        value: Vec<u8>,
678    ) -> Result<WaitVersion> {
679        let request = AlterSecretRequest {
680            secret_id,
681            name: secret_name,
682            database_id,
683            schema_id,
684            owner_id,
685            value,
686        };
687        let resp = self.inner.alter_secret(request).await?;
688        Ok(resp
689            .version
690            .ok_or_else(|| anyhow!("wait version not set"))?)
691    }
692
693    pub async fn replace_job(
694        &self,
695        graph: StreamFragmentGraph,
696        replace_job: ReplaceJob,
697    ) -> Result<WaitVersion> {
698        let request = ReplaceJobPlanRequest {
699            plan: Some(ReplaceJobPlan {
700                fragment_graph: Some(graph),
701                replace_job: Some(replace_job),
702            }),
703        };
704        let resp = self.inner.replace_job_plan(request).await?;
705        // TODO: handle error in `resp.status` here
706        Ok(resp
707            .version
708            .ok_or_else(|| anyhow!("wait version not set"))?)
709    }
710
711    pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
712        let request = AutoSchemaChangeRequest {
713            schema_change: Some(schema_change),
714        };
715        let _ = self.inner.auto_schema_change(request).await?;
716        Ok(())
717    }
718
719    pub async fn create_view(&self, view: PbView) -> Result<WaitVersion> {
720        let request = CreateViewRequest { view: Some(view) };
721        let resp = self.inner.create_view(request).await?;
722        // TODO: handle error in `resp.status` here
723        Ok(resp
724            .version
725            .ok_or_else(|| anyhow!("wait version not set"))?)
726    }
727
728    pub async fn create_index(
729        &self,
730        index: PbIndex,
731        table: PbTable,
732        graph: StreamFragmentGraph,
733        if_not_exists: bool,
734    ) -> Result<WaitVersion> {
735        let request = CreateIndexRequest {
736            index: Some(index),
737            index_table: Some(table),
738            fragment_graph: Some(graph),
739            if_not_exists,
740        };
741        let resp = self.inner.create_index(request).await?;
742        // TODO: handle error in `resp.status` here
743        Ok(resp
744            .version
745            .ok_or_else(|| anyhow!("wait version not set"))?)
746    }
747
748    pub async fn drop_table(
749        &self,
750        source_id: Option<u32>,
751        table_id: TableId,
752        cascade: bool,
753    ) -> Result<WaitVersion> {
754        let request = DropTableRequest {
755            source_id: source_id.map(SourceId::Id),
756            table_id: table_id.table_id(),
757            cascade,
758        };
759
760        let resp = self.inner.drop_table(request).await?;
761        Ok(resp
762            .version
763            .ok_or_else(|| anyhow!("wait version not set"))?)
764    }
765
766    pub async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<WaitVersion> {
767        let request = DropViewRequest { view_id, cascade };
768        let resp = self.inner.drop_view(request).await?;
769        Ok(resp
770            .version
771            .ok_or_else(|| anyhow!("wait version not set"))?)
772    }
773
774    pub async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<WaitVersion> {
775        let request = DropSourceRequest { source_id, cascade };
776        let resp = self.inner.drop_source(request).await?;
777        Ok(resp
778            .version
779            .ok_or_else(|| anyhow!("wait version not set"))?)
780    }
781
782    pub async fn drop_sink(
783        &self,
784        sink_id: u32,
785        cascade: bool,
786        affected_table_change: Option<ReplaceJobPlan>,
787    ) -> Result<WaitVersion> {
788        let request = DropSinkRequest {
789            sink_id,
790            cascade,
791            affected_table_change,
792        };
793        let resp = self.inner.drop_sink(request).await?;
794        Ok(resp
795            .version
796            .ok_or_else(|| anyhow!("wait version not set"))?)
797    }
798
799    pub async fn drop_subscription(
800        &self,
801        subscription_id: u32,
802        cascade: bool,
803    ) -> Result<WaitVersion> {
804        let request = DropSubscriptionRequest {
805            subscription_id,
806            cascade,
807        };
808        let resp = self.inner.drop_subscription(request).await?;
809        Ok(resp
810            .version
811            .ok_or_else(|| anyhow!("wait version not set"))?)
812    }
813
814    pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
815        let request = DropIndexRequest {
816            index_id: index_id.index_id,
817            cascade,
818        };
819        let resp = self.inner.drop_index(request).await?;
820        Ok(resp
821            .version
822            .ok_or_else(|| anyhow!("wait version not set"))?)
823    }
824
825    pub async fn drop_function(&self, function_id: FunctionId) -> Result<WaitVersion> {
826        let request = DropFunctionRequest {
827            function_id: function_id.0,
828        };
829        let resp = self.inner.drop_function(request).await?;
830        Ok(resp
831            .version
832            .ok_or_else(|| anyhow!("wait version not set"))?)
833    }
834
835    pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
836        let request = DropDatabaseRequest { database_id };
837        let resp = self.inner.drop_database(request).await?;
838        Ok(resp
839            .version
840            .ok_or_else(|| anyhow!("wait version not set"))?)
841    }
842
843    pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
844        let request = DropSchemaRequest { schema_id, cascade };
845        let resp = self.inner.drop_schema(request).await?;
846        Ok(resp
847            .version
848            .ok_or_else(|| anyhow!("wait version not set"))?)
849    }
850
851    // TODO: using UserInfoVersion instead as return type.
852    pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
853        let request = CreateUserRequest { user: Some(user) };
854        let resp = self.inner.create_user(request).await?;
855        Ok(resp.version)
856    }
857
858    pub async fn drop_user(&self, user_id: u32) -> Result<u64> {
859        let request = DropUserRequest { user_id };
860        let resp = self.inner.drop_user(request).await?;
861        Ok(resp.version)
862    }
863
864    pub async fn update_user(
865        &self,
866        user: UserInfo,
867        update_fields: Vec<UpdateField>,
868    ) -> Result<u64> {
869        let request = UpdateUserRequest {
870            user: Some(user),
871            update_fields: update_fields
872                .into_iter()
873                .map(|field| field as i32)
874                .collect::<Vec<_>>(),
875        };
876        let resp = self.inner.update_user(request).await?;
877        Ok(resp.version)
878    }
879
880    pub async fn grant_privilege(
881        &self,
882        user_ids: Vec<u32>,
883        privileges: Vec<GrantPrivilege>,
884        with_grant_option: bool,
885        granted_by: u32,
886    ) -> Result<u64> {
887        let request = GrantPrivilegeRequest {
888            user_ids,
889            privileges,
890            with_grant_option,
891            granted_by,
892        };
893        let resp = self.inner.grant_privilege(request).await?;
894        Ok(resp.version)
895    }
896
897    pub async fn revoke_privilege(
898        &self,
899        user_ids: Vec<u32>,
900        privileges: Vec<GrantPrivilege>,
901        granted_by: u32,
902        revoke_by: u32,
903        revoke_grant_option: bool,
904        cascade: bool,
905    ) -> Result<u64> {
906        let request = RevokePrivilegeRequest {
907            user_ids,
908            privileges,
909            granted_by,
910            revoke_by,
911            revoke_grant_option,
912            cascade,
913        };
914        let resp = self.inner.revoke_privilege(request).await?;
915        Ok(resp.version)
916    }
917
918    /// Unregister the current node from the cluster.
919    pub async fn unregister(&self) -> Result<()> {
920        let request = DeleteWorkerNodeRequest {
921            host: Some(self.host_addr.to_protobuf()),
922        };
923        self.inner.delete_worker_node(request).await?;
924        self.shutting_down.store(true, Relaxed);
925        Ok(())
926    }
927
928    /// Try to unregister the current worker from the cluster with best effort. Log the result.
929    pub async fn try_unregister(&self) {
930        match self.unregister().await {
931            Ok(_) => {
932                tracing::info!(
933                    worker_id = self.worker_id(),
934                    "successfully unregistered from meta service",
935                )
936            }
937            Err(e) => {
938                tracing::warn!(
939                    error = %e.as_report(),
940                    worker_id = self.worker_id(),
941                    "failed to unregister from meta service",
942                );
943            }
944        }
945    }
946
947    pub async fn update_schedulability(
948        &self,
949        worker_ids: &[u32],
950        schedulability: Schedulability,
951    ) -> Result<UpdateWorkerNodeSchedulabilityResponse> {
952        let request = UpdateWorkerNodeSchedulabilityRequest {
953            worker_ids: worker_ids.to_vec(),
954            schedulability: schedulability.into(),
955        };
956        let resp = self
957            .inner
958            .update_worker_node_schedulability(request)
959            .await?;
960        Ok(resp)
961    }
962
963    pub async fn list_worker_nodes(
964        &self,
965        worker_type: Option<WorkerType>,
966    ) -> Result<Vec<WorkerNode>> {
967        let request = ListAllNodesRequest {
968            worker_type: worker_type.map(Into::into),
969            include_starting_nodes: true,
970        };
971        let resp = self.inner.list_all_nodes(request).await?;
972        Ok(resp.nodes)
973    }
974
975    /// Starts a heartbeat worker.
976    pub fn start_heartbeat_loop(
977        meta_client: MetaClient,
978        min_interval: Duration,
979    ) -> (JoinHandle<()>, Sender<()>) {
980        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
981        let join_handle = tokio::spawn(async move {
982            let mut min_interval_ticker = tokio::time::interval(min_interval);
983            loop {
984                tokio::select! {
985                    biased;
986                    // Shutdown
987                    _ = &mut shutdown_rx => {
988                        tracing::info!("Heartbeat loop is stopped");
989                        return;
990                    }
991                    // Wait for interval
992                    _ = min_interval_ticker.tick() => {},
993                }
994                tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
995                match tokio::time::timeout(
996                    // TODO: decide better min_interval for timeout
997                    min_interval * 3,
998                    meta_client.send_heartbeat(meta_client.worker_id()),
999                )
1000                .await
1001                {
1002                    Ok(Ok(_)) => {}
1003                    Ok(Err(err)) => {
1004                        tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
1005                    }
1006                    Err(_) => {
1007                        tracing::warn!("Failed to send_heartbeat: timeout");
1008                    }
1009                }
1010            }
1011        });
1012        (join_handle, shutdown_tx)
1013    }
1014
1015    pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
1016        let request = RisectlListStateTablesRequest {};
1017        let resp = self.inner.risectl_list_state_tables(request).await?;
1018        Ok(resp.tables)
1019    }
1020
1021    pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
1022        let request = FlushRequest { database_id };
1023        let resp = self.inner.flush(request).await?;
1024        Ok(HummockVersionId::new(resp.hummock_version_id))
1025    }
1026
1027    pub async fn wait(&self) -> Result<()> {
1028        let request = WaitRequest {};
1029        self.inner.wait(request).await?;
1030        Ok(())
1031    }
1032
1033    pub async fn recover(&self) -> Result<()> {
1034        let request = RecoverRequest {};
1035        self.inner.recover(request).await?;
1036        Ok(())
1037    }
1038
1039    pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
1040        let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
1041        let resp = self.inner.cancel_creating_jobs(request).await?;
1042        Ok(resp.canceled_jobs)
1043    }
1044
1045    pub async fn list_table_fragments(
1046        &self,
1047        table_ids: &[u32],
1048    ) -> Result<HashMap<u32, TableFragmentInfo>> {
1049        let request = ListTableFragmentsRequest {
1050            table_ids: table_ids.to_vec(),
1051        };
1052        let resp = self.inner.list_table_fragments(request).await?;
1053        Ok(resp.table_fragments)
1054    }
1055
1056    pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1057        let resp = self
1058            .inner
1059            .list_streaming_job_states(ListStreamingJobStatesRequest {})
1060            .await?;
1061        Ok(resp.states)
1062    }
1063
1064    pub async fn list_fragment_distributions(&self) -> Result<Vec<FragmentDistribution>> {
1065        let resp = self
1066            .inner
1067            .list_fragment_distribution(ListFragmentDistributionRequest {})
1068            .await?;
1069        Ok(resp.distributions)
1070    }
1071
1072    pub async fn list_creating_stream_scan_fragment_distribution(
1073        &self,
1074    ) -> Result<Vec<FragmentDistribution>> {
1075        let resp = self
1076            .inner
1077            .list_creating_stream_scan_fragment_distribution(
1078                ListCreatingStreamScanFragmentDistributionRequest {},
1079            )
1080            .await?;
1081        Ok(resp.distributions)
1082    }
1083
1084    pub async fn get_fragment_by_id(
1085        &self,
1086        fragment_id: u32,
1087    ) -> Result<Option<FragmentDistribution>> {
1088        let resp = self
1089            .inner
1090            .get_fragment_by_id(GetFragmentByIdRequest { fragment_id })
1091            .await?;
1092        Ok(resp.distribution)
1093    }
1094
1095    pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1096        let resp = self
1097            .inner
1098            .list_actor_states(ListActorStatesRequest {})
1099            .await?;
1100        Ok(resp.states)
1101    }
1102
1103    pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1104        let resp = self
1105            .inner
1106            .list_actor_splits(ListActorSplitsRequest {})
1107            .await?;
1108
1109        Ok(resp.actor_splits)
1110    }
1111
1112    pub async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
1113        let resp = self
1114            .inner
1115            .list_object_dependencies(ListObjectDependenciesRequest {})
1116            .await?;
1117        Ok(resp.dependencies)
1118    }
1119
1120    pub async fn pause(&self) -> Result<PauseResponse> {
1121        let request = PauseRequest {};
1122        let resp = self.inner.pause(request).await?;
1123        Ok(resp)
1124    }
1125
1126    pub async fn resume(&self) -> Result<ResumeResponse> {
1127        let request = ResumeRequest {};
1128        let resp = self.inner.resume(request).await?;
1129        Ok(resp)
1130    }
1131
1132    pub async fn apply_throttle(
1133        &self,
1134        kind: PbThrottleTarget,
1135        id: u32,
1136        rate: Option<u32>,
1137    ) -> Result<ApplyThrottleResponse> {
1138        let request = ApplyThrottleRequest {
1139            kind: kind as i32,
1140            id,
1141            rate,
1142        };
1143        let resp = self.inner.apply_throttle(request).await?;
1144        Ok(resp)
1145    }
1146
1147    pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1148        let resp = self
1149            .inner
1150            .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1151            .await?;
1152        Ok(resp.get_status().unwrap())
1153    }
1154
1155    pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1156        let request = GetClusterInfoRequest {};
1157        let resp = self.inner.get_cluster_info(request).await?;
1158        Ok(resp)
1159    }
1160
1161    pub async fn reschedule(
1162        &self,
1163        worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1164        revision: u64,
1165        resolve_no_shuffle_upstream: bool,
1166    ) -> Result<(bool, u64)> {
1167        let request = RescheduleRequest {
1168            revision,
1169            resolve_no_shuffle_upstream,
1170            worker_reschedules,
1171        };
1172        let resp = self.inner.reschedule(request).await?;
1173        Ok((resp.success, resp.revision))
1174    }
1175
1176    pub async fn risectl_get_pinned_versions_summary(
1177        &self,
1178    ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1179        let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1180        self.inner
1181            .rise_ctl_get_pinned_versions_summary(request)
1182            .await
1183    }
1184
1185    pub async fn risectl_get_checkpoint_hummock_version(
1186        &self,
1187    ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1188        let request = RiseCtlGetCheckpointVersionRequest {};
1189        self.inner.rise_ctl_get_checkpoint_version(request).await
1190    }
1191
1192    pub async fn risectl_pause_hummock_version_checkpoint(
1193        &self,
1194    ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1195        let request = RiseCtlPauseVersionCheckpointRequest {};
1196        self.inner.rise_ctl_pause_version_checkpoint(request).await
1197    }
1198
1199    pub async fn risectl_resume_hummock_version_checkpoint(
1200        &self,
1201    ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1202        let request = RiseCtlResumeVersionCheckpointRequest {};
1203        self.inner.rise_ctl_resume_version_checkpoint(request).await
1204    }
1205
1206    pub async fn init_metadata_for_replay(
1207        &self,
1208        tables: Vec<PbTable>,
1209        compaction_groups: Vec<CompactionGroupInfo>,
1210    ) -> Result<()> {
1211        let req = InitMetadataForReplayRequest {
1212            tables,
1213            compaction_groups,
1214        };
1215        let _resp = self.inner.init_metadata_for_replay(req).await?;
1216        Ok(())
1217    }
1218
1219    pub async fn replay_version_delta(
1220        &self,
1221        version_delta: HummockVersionDelta,
1222    ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1223        let req = ReplayVersionDeltaRequest {
1224            version_delta: Some(version_delta.into()),
1225        };
1226        let resp = self.inner.replay_version_delta(req).await?;
1227        Ok((
1228            HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1229            resp.modified_compaction_groups,
1230        ))
1231    }
1232
1233    pub async fn list_version_deltas(
1234        &self,
1235        start_id: HummockVersionId,
1236        num_limit: u32,
1237        committed_epoch_limit: HummockEpoch,
1238    ) -> Result<Vec<HummockVersionDelta>> {
1239        let req = ListVersionDeltasRequest {
1240            start_id: start_id.to_u64(),
1241            num_limit,
1242            committed_epoch_limit,
1243        };
1244        Ok(self
1245            .inner
1246            .list_version_deltas(req)
1247            .await?
1248            .version_deltas
1249            .unwrap()
1250            .version_deltas
1251            .iter()
1252            .map(HummockVersionDelta::from_rpc_protobuf)
1253            .collect())
1254    }
1255
1256    pub async fn trigger_compaction_deterministic(
1257        &self,
1258        version_id: HummockVersionId,
1259        compaction_groups: Vec<CompactionGroupId>,
1260    ) -> Result<()> {
1261        let req = TriggerCompactionDeterministicRequest {
1262            version_id: version_id.to_u64(),
1263            compaction_groups,
1264        };
1265        self.inner.trigger_compaction_deterministic(req).await?;
1266        Ok(())
1267    }
1268
1269    pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1270        let req = DisableCommitEpochRequest {};
1271        Ok(HummockVersion::from_rpc_protobuf(
1272            &self
1273                .inner
1274                .disable_commit_epoch(req)
1275                .await?
1276                .current_version
1277                .unwrap(),
1278        ))
1279    }
1280
1281    pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1282        let req = GetAssignedCompactTaskNumRequest {};
1283        let resp = self.inner.get_assigned_compact_task_num(req).await?;
1284        Ok(resp.num_tasks as usize)
1285    }
1286
1287    pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1288        let req = RiseCtlListCompactionGroupRequest {};
1289        let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1290        Ok(resp.compaction_groups)
1291    }
1292
1293    pub async fn risectl_update_compaction_config(
1294        &self,
1295        compaction_groups: &[CompactionGroupId],
1296        configs: &[MutableConfig],
1297    ) -> Result<()> {
1298        let req = RiseCtlUpdateCompactionConfigRequest {
1299            compaction_group_ids: compaction_groups.to_vec(),
1300            configs: configs
1301                .iter()
1302                .map(
1303                    |c| rise_ctl_update_compaction_config_request::MutableConfig {
1304                        mutable_config: Some(c.clone()),
1305                    },
1306                )
1307                .collect(),
1308        };
1309        let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1310        Ok(())
1311    }
1312
1313    pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1314        let req = BackupMetaRequest { remarks };
1315        let resp = self.inner.backup_meta(req).await?;
1316        Ok(resp.job_id)
1317    }
1318
1319    pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1320        let req = GetBackupJobStatusRequest { job_id };
1321        let resp = self.inner.get_backup_job_status(req).await?;
1322        Ok((resp.job_status(), resp.message))
1323    }
1324
1325    pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1326        let req = DeleteMetaSnapshotRequest {
1327            snapshot_ids: snapshot_ids.to_vec(),
1328        };
1329        let _resp = self.inner.delete_meta_snapshot(req).await?;
1330        Ok(())
1331    }
1332
1333    pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1334        let req = GetMetaSnapshotManifestRequest {};
1335        let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1336        Ok(resp.manifest.expect("should exist"))
1337    }
1338
1339    pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1340        let req = GetTelemetryInfoRequest {};
1341        let resp = self.inner.get_telemetry_info(req).await?;
1342        Ok(resp)
1343    }
1344
1345    pub async fn get_system_params(&self) -> Result<SystemParamsReader> {
1346        let req = GetSystemParamsRequest {};
1347        let resp = self.inner.get_system_params(req).await?;
1348        Ok(resp.params.unwrap().into())
1349    }
1350
1351    pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1352        let req = GetMetaStoreInfoRequest {};
1353        let resp = self.inner.get_meta_store_info(req).await?;
1354        Ok(resp.meta_store_endpoint)
1355    }
1356
1357    pub async fn alter_sink_props(
1358        &self,
1359        sink_id: u32,
1360        changed_props: BTreeMap<String, String>,
1361        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1362        connector_conn_ref: Option<u32>,
1363    ) -> Result<()> {
1364        let req = AlterConnectorPropsRequest {
1365            object_id: sink_id,
1366            changed_props: changed_props.into_iter().collect(),
1367            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1368            connector_conn_ref,
1369            object_type: AlterConnectorPropsObject::Sink as i32,
1370        };
1371        let _resp = self.inner.alter_connector_props(req).await?;
1372        Ok(())
1373    }
1374
1375    pub async fn set_system_param(
1376        &self,
1377        param: String,
1378        value: Option<String>,
1379    ) -> Result<Option<SystemParamsReader>> {
1380        let req = SetSystemParamRequest { param, value };
1381        let resp = self.inner.set_system_param(req).await?;
1382        Ok(resp.params.map(SystemParamsReader::from))
1383    }
1384
1385    pub async fn get_session_params(&self) -> Result<String> {
1386        let req = GetSessionParamsRequest {};
1387        let resp = self.inner.get_session_params(req).await?;
1388        Ok(resp.params)
1389    }
1390
1391    pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1392        let req = SetSessionParamRequest { param, value };
1393        let resp = self.inner.set_session_param(req).await?;
1394        Ok(resp.param)
1395    }
1396
1397    pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1398        let req = GetDdlProgressRequest {};
1399        let resp = self.inner.get_ddl_progress(req).await?;
1400        Ok(resp.ddl_progress)
1401    }
1402
1403    pub async fn split_compaction_group(
1404        &self,
1405        group_id: CompactionGroupId,
1406        table_ids_to_new_group: &[StateTableId],
1407        partition_vnode_count: u32,
1408    ) -> Result<CompactionGroupId> {
1409        let req = SplitCompactionGroupRequest {
1410            group_id,
1411            table_ids: table_ids_to_new_group.to_vec(),
1412            partition_vnode_count,
1413        };
1414        let resp = self.inner.split_compaction_group(req).await?;
1415        Ok(resp.new_group_id)
1416    }
1417
1418    pub async fn get_tables(
1419        &self,
1420        table_ids: &[u32],
1421        include_dropped_tables: bool,
1422    ) -> Result<HashMap<u32, Table>> {
1423        let req = GetTablesRequest {
1424            table_ids: table_ids.to_vec(),
1425            include_dropped_tables,
1426        };
1427        let resp = self.inner.get_tables(req).await?;
1428        Ok(resp.tables)
1429    }
1430
1431    pub async fn list_serving_vnode_mappings(
1432        &self,
1433    ) -> Result<HashMap<u32, (u32, WorkerSlotMapping)>> {
1434        let req = GetServingVnodeMappingsRequest {};
1435        let resp = self.inner.get_serving_vnode_mappings(req).await?;
1436        let mappings = resp
1437            .worker_slot_mappings
1438            .into_iter()
1439            .map(|p| {
1440                (
1441                    p.fragment_id,
1442                    (
1443                        resp.fragment_to_table
1444                            .get(&p.fragment_id)
1445                            .cloned()
1446                            .unwrap_or(0),
1447                        WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1448                    ),
1449                )
1450            })
1451            .collect();
1452        Ok(mappings)
1453    }
1454
1455    pub async fn risectl_list_compaction_status(
1456        &self,
1457    ) -> Result<(
1458        Vec<CompactStatus>,
1459        Vec<CompactTaskAssignment>,
1460        Vec<CompactTaskProgress>,
1461    )> {
1462        let req = RiseCtlListCompactionStatusRequest {};
1463        let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1464        Ok((
1465            resp.compaction_statuses,
1466            resp.task_assignment,
1467            resp.task_progress,
1468        ))
1469    }
1470
1471    pub async fn get_compaction_score(
1472        &self,
1473        compaction_group_id: CompactionGroupId,
1474    ) -> Result<Vec<PickerInfo>> {
1475        let req = GetCompactionScoreRequest {
1476            compaction_group_id,
1477        };
1478        let resp = self.inner.get_compaction_score(req).await?;
1479        Ok(resp.scores)
1480    }
1481
1482    pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1483        let req = RiseCtlRebuildTableStatsRequest {};
1484        let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1485        Ok(())
1486    }
1487
1488    pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1489        let req = ListBranchedObjectRequest {};
1490        let resp = self.inner.list_branched_object(req).await?;
1491        Ok(resp.branched_objects)
1492    }
1493
1494    pub async fn list_active_write_limit(&self) -> Result<HashMap<u64, WriteLimit>> {
1495        let req = ListActiveWriteLimitRequest {};
1496        let resp = self.inner.list_active_write_limit(req).await?;
1497        Ok(resp.write_limits)
1498    }
1499
1500    pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1501        let req = ListHummockMetaConfigRequest {};
1502        let resp = self.inner.list_hummock_meta_config(req).await?;
1503        Ok(resp.configs)
1504    }
1505
1506    pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1507        let _resp = self
1508            .inner
1509            .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1510            .await?;
1511
1512        Ok(())
1513    }
1514
1515    pub async fn rw_cloud_validate_source(
1516        &self,
1517        source_type: SourceType,
1518        source_config: HashMap<String, String>,
1519    ) -> Result<RwCloudValidateSourceResponse> {
1520        let req = RwCloudValidateSourceRequest {
1521            source_type: source_type.into(),
1522            source_config,
1523        };
1524        let resp = self.inner.rw_cloud_validate_source(req).await?;
1525        Ok(resp)
1526    }
1527
1528    pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1529        self.inner.core.read().await.sink_coordinate_client.clone()
1530    }
1531
1532    pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1533        let req = ListCompactTaskAssignmentRequest {};
1534        let resp = self.inner.list_compact_task_assignment(req).await?;
1535        Ok(resp.task_assignment)
1536    }
1537
1538    pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1539        let req = ListEventLogRequest::default();
1540        let resp = self.inner.list_event_log(req).await?;
1541        Ok(resp.event_logs)
1542    }
1543
1544    pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1545        let req = ListCompactTaskProgressRequest {};
1546        let resp = self.inner.list_compact_task_progress(req).await?;
1547        Ok(resp.task_progress)
1548    }
1549
1550    #[cfg(madsim)]
1551    pub fn try_add_panic_event_blocking(
1552        &self,
1553        panic_info: impl Display,
1554        timeout_millis: Option<u64>,
1555    ) {
1556    }
1557
1558    /// If `timeout_millis` is None, default is used.
1559    #[cfg(not(madsim))]
1560    pub fn try_add_panic_event_blocking(
1561        &self,
1562        panic_info: impl Display,
1563        timeout_millis: Option<u64>,
1564    ) {
1565        let event = event_log::EventWorkerNodePanic {
1566            worker_id: self.worker_id,
1567            worker_type: self.worker_type.into(),
1568            host_addr: Some(self.host_addr.to_protobuf()),
1569            panic_info: format!("{panic_info}"),
1570        };
1571        let grpc_meta_client = self.inner.clone();
1572        let _ = thread::spawn(move || {
1573            let rt = tokio::runtime::Builder::new_current_thread()
1574                .enable_all()
1575                .build()
1576                .unwrap();
1577            let req = AddEventLogRequest {
1578                event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1579            };
1580            rt.block_on(async {
1581                let _ = tokio::time::timeout(
1582                    Duration::from_millis(timeout_millis.unwrap_or(1000)),
1583                    grpc_meta_client.add_event_log(req),
1584                )
1585                .await;
1586            });
1587        })
1588        .join();
1589    }
1590
1591    pub async fn add_sink_fail_evet(
1592        &self,
1593        sink_id: u32,
1594        sink_name: String,
1595        connector: String,
1596        error: String,
1597    ) -> Result<()> {
1598        let event = event_log::EventSinkFail {
1599            sink_id,
1600            sink_name,
1601            connector,
1602            error,
1603        };
1604        let req = AddEventLogRequest {
1605            event: Some(add_event_log_request::Event::SinkFail(event)),
1606        };
1607        self.inner.add_event_log(req).await?;
1608        Ok(())
1609    }
1610
1611    pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1612        let req = CancelCompactTaskRequest {
1613            task_id,
1614            task_status: task_status as _,
1615        };
1616        let resp = self.inner.cancel_compact_task(req).await?;
1617        Ok(resp.ret)
1618    }
1619
1620    pub async fn get_version_by_epoch(
1621        &self,
1622        epoch: HummockEpoch,
1623        table_id: u32,
1624    ) -> Result<PbHummockVersion> {
1625        let req = GetVersionByEpochRequest { epoch, table_id };
1626        let resp = self.inner.get_version_by_epoch(req).await?;
1627        Ok(resp.version.unwrap())
1628    }
1629
1630    pub async fn get_cluster_limits(
1631        &self,
1632    ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1633        let req = GetClusterLimitsRequest {};
1634        let resp = self.inner.get_cluster_limits(req).await?;
1635        Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1636    }
1637
1638    pub async fn merge_compaction_group(
1639        &self,
1640        left_group_id: CompactionGroupId,
1641        right_group_id: CompactionGroupId,
1642    ) -> Result<()> {
1643        let req = MergeCompactionGroupRequest {
1644            left_group_id,
1645            right_group_id,
1646        };
1647        self.inner.merge_compaction_group(req).await?;
1648        Ok(())
1649    }
1650
1651    /// List all rate limits for sources and backfills
1652    pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1653        let request = ListRateLimitsRequest {};
1654        let resp = self.inner.list_rate_limits(request).await?;
1655        Ok(resp.rate_limits)
1656    }
1657
1658    pub async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
1659        let request = ListIcebergTablesRequest {};
1660        let resp = self.inner.list_iceberg_tables(request).await?;
1661        Ok(resp.iceberg_tables)
1662    }
1663
1664    /// Get the await tree of all nodes in the cluster.
1665    pub async fn get_cluster_stack_trace(
1666        &self,
1667        actor_traces_format: ActorTracesFormat,
1668    ) -> Result<StackTraceResponse> {
1669        let request = StackTraceRequest {
1670            actor_traces_format: actor_traces_format as i32,
1671        };
1672        let resp = self.inner.stack_trace(request).await?;
1673        Ok(resp)
1674    }
1675}
1676
1677#[async_trait]
1678impl HummockMetaClient for MetaClient {
1679    async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
1680        let req = UnpinVersionBeforeRequest {
1681            context_id: self.worker_id(),
1682            unpin_version_before: unpin_version_before.to_u64(),
1683        };
1684        self.inner.unpin_version_before(req).await?;
1685        Ok(())
1686    }
1687
1688    async fn get_current_version(&self) -> Result<HummockVersion> {
1689        let req = GetCurrentVersionRequest::default();
1690        Ok(HummockVersion::from_rpc_protobuf(
1691            &self
1692                .inner
1693                .get_current_version(req)
1694                .await?
1695                .current_version
1696                .unwrap(),
1697        ))
1698    }
1699
1700    async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
1701        let resp = self
1702            .inner
1703            .get_new_object_ids(GetNewObjectIdsRequest { number })
1704            .await?;
1705        Ok(ObjectIdRange::new(resp.start_id, resp.end_id))
1706    }
1707
1708    async fn commit_epoch_with_change_log(
1709        &self,
1710        _epoch: HummockEpoch,
1711        _sync_result: SyncResult,
1712        _change_log_info: Option<HummockMetaClientChangeLogInfo>,
1713    ) -> Result<()> {
1714        panic!("Only meta service can commit_epoch in production.")
1715    }
1716
1717    async fn trigger_manual_compaction(
1718        &self,
1719        compaction_group_id: u64,
1720        table_id: u32,
1721        level: u32,
1722        sst_ids: Vec<u64>,
1723    ) -> Result<()> {
1724        // TODO: support key_range parameter
1725        let req = TriggerManualCompactionRequest {
1726            compaction_group_id,
1727            table_id,
1728            // if table_id not exist, manual_compaction will include all the sst
1729            // without check internal_table_id
1730            level,
1731            sst_ids,
1732            ..Default::default()
1733        };
1734
1735        self.inner.trigger_manual_compaction(req).await?;
1736        Ok(())
1737    }
1738
1739    async fn trigger_full_gc(
1740        &self,
1741        sst_retention_time_sec: u64,
1742        prefix: Option<String>,
1743    ) -> Result<()> {
1744        self.inner
1745            .trigger_full_gc(TriggerFullGcRequest {
1746                sst_retention_time_sec,
1747                prefix,
1748            })
1749            .await?;
1750        Ok(())
1751    }
1752
1753    async fn subscribe_compaction_event(
1754        &self,
1755    ) -> Result<(
1756        UnboundedSender<SubscribeCompactionEventRequest>,
1757        BoxStream<'static, CompactionEventItem>,
1758    )> {
1759        let (request_sender, request_receiver) =
1760            unbounded_channel::<SubscribeCompactionEventRequest>();
1761        request_sender
1762            .send(SubscribeCompactionEventRequest {
1763                event: Some(subscribe_compaction_event_request::Event::Register(
1764                    Register {
1765                        context_id: self.worker_id(),
1766                    },
1767                )),
1768                create_at: SystemTime::now()
1769                    .duration_since(SystemTime::UNIX_EPOCH)
1770                    .expect("Clock may have gone backwards")
1771                    .as_millis() as u64,
1772            })
1773            .context("Failed to subscribe compaction event")?;
1774
1775        let stream = self
1776            .inner
1777            .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
1778                request_receiver,
1779            )))
1780            .await?;
1781
1782        Ok((request_sender, Box::pin(stream)))
1783    }
1784
1785    async fn get_version_by_epoch(
1786        &self,
1787        epoch: HummockEpoch,
1788        table_id: u32,
1789    ) -> Result<PbHummockVersion> {
1790        self.get_version_by_epoch(epoch, table_id).await
1791    }
1792
1793    async fn subscribe_iceberg_compaction_event(
1794        &self,
1795    ) -> Result<(
1796        UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1797        BoxStream<'static, IcebergCompactionEventItem>,
1798    )> {
1799        let (request_sender, request_receiver) =
1800            unbounded_channel::<SubscribeIcebergCompactionEventRequest>();
1801        request_sender
1802            .send(SubscribeIcebergCompactionEventRequest {
1803                event: Some(subscribe_iceberg_compaction_event_request::Event::Register(
1804                    IcebergRegister {
1805                        context_id: self.worker_id(),
1806                    },
1807                )),
1808                create_at: SystemTime::now()
1809                    .duration_since(SystemTime::UNIX_EPOCH)
1810                    .expect("Clock may have gone backwards")
1811                    .as_millis() as u64,
1812            })
1813            .context("Failed to subscribe compaction event")?;
1814
1815        let stream = self
1816            .inner
1817            .subscribe_iceberg_compaction_event(Request::new(UnboundedReceiverStream::new(
1818                request_receiver,
1819            )))
1820            .await?;
1821
1822        Ok((request_sender, Box::pin(stream)))
1823    }
1824}
1825
1826#[async_trait]
1827impl TelemetryInfoFetcher for MetaClient {
1828    async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
1829        let resp = self
1830            .get_telemetry_info()
1831            .await
1832            .map_err(|e| e.to_report_string())?;
1833        let tracking_id = resp.get_tracking_id().ok();
1834        Ok(tracking_id.map(|id| id.to_owned()))
1835    }
1836}
1837
1838pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
1839
1840#[derive(Debug, Clone)]
1841struct GrpcMetaClientCore {
1842    cluster_client: ClusterServiceClient<Channel>,
1843    meta_member_client: MetaMemberServiceClient<Channel>,
1844    heartbeat_client: HeartbeatServiceClient<Channel>,
1845    ddl_client: DdlServiceClient<Channel>,
1846    hummock_client: HummockManagerServiceClient<Channel>,
1847    notification_client: NotificationServiceClient<Channel>,
1848    stream_client: StreamManagerServiceClient<Channel>,
1849    user_client: UserServiceClient<Channel>,
1850    scale_client: ScaleServiceClient<Channel>,
1851    backup_client: BackupServiceClient<Channel>,
1852    telemetry_client: TelemetryInfoServiceClient<Channel>,
1853    system_params_client: SystemParamsServiceClient<Channel>,
1854    session_params_client: SessionParamServiceClient<Channel>,
1855    serving_client: ServingServiceClient<Channel>,
1856    cloud_client: CloudServiceClient<Channel>,
1857    sink_coordinate_client: SinkCoordinationRpcClient,
1858    event_log_client: EventLogServiceClient<Channel>,
1859    cluster_limit_client: ClusterLimitServiceClient<Channel>,
1860    hosted_iceberg_catalog_service_client: HostedIcebergCatalogServiceClient<Channel>,
1861    monitor_client: MonitorServiceClient<Channel>,
1862}
1863
1864impl GrpcMetaClientCore {
1865    pub(crate) fn new(channel: Channel) -> Self {
1866        let cluster_client = ClusterServiceClient::new(channel.clone());
1867        let meta_member_client = MetaMemberClient::new(channel.clone());
1868        let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
1869        let ddl_client =
1870            DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1871        let hummock_client =
1872            HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1873        let notification_client =
1874            NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1875        let stream_client =
1876            StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1877        let user_client = UserServiceClient::new(channel.clone());
1878        let scale_client =
1879            ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1880        let backup_client = BackupServiceClient::new(channel.clone());
1881        let telemetry_client =
1882            TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1883        let system_params_client = SystemParamsServiceClient::new(channel.clone());
1884        let session_params_client = SessionParamServiceClient::new(channel.clone());
1885        let serving_client = ServingServiceClient::new(channel.clone());
1886        let cloud_client = CloudServiceClient::new(channel.clone());
1887        let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone());
1888        let event_log_client = EventLogServiceClient::new(channel.clone());
1889        let cluster_limit_client = ClusterLimitServiceClient::new(channel.clone());
1890        let hosted_iceberg_catalog_service_client =
1891            HostedIcebergCatalogServiceClient::new(channel.clone());
1892        let monitor_client = MonitorServiceClient::new(channel);
1893
1894        GrpcMetaClientCore {
1895            cluster_client,
1896            meta_member_client,
1897            heartbeat_client,
1898            ddl_client,
1899            hummock_client,
1900            notification_client,
1901            stream_client,
1902            user_client,
1903            scale_client,
1904            backup_client,
1905            telemetry_client,
1906            system_params_client,
1907            session_params_client,
1908            serving_client,
1909            cloud_client,
1910            sink_coordinate_client,
1911            event_log_client,
1912            cluster_limit_client,
1913            hosted_iceberg_catalog_service_client,
1914            monitor_client,
1915        }
1916    }
1917}
1918
1919/// Client to meta server. Cloning the instance is lightweight.
1920///
1921/// It is a wrapper of tonic client. See [`crate::meta_rpc_client_method_impl`].
1922#[derive(Debug, Clone)]
1923struct GrpcMetaClient {
1924    member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
1925    core: Arc<RwLock<GrpcMetaClientCore>>,
1926}
1927
1928type MetaMemberClient = MetaMemberServiceClient<Channel>;
1929
1930struct MetaMemberGroup {
1931    members: LruCache<http::Uri, Option<MetaMemberClient>>,
1932}
1933
1934struct MetaMemberManagement {
1935    core_ref: Arc<RwLock<GrpcMetaClientCore>>,
1936    members: Either<MetaMemberClient, MetaMemberGroup>,
1937    current_leader: http::Uri,
1938    meta_config: MetaConfig,
1939}
1940
1941impl MetaMemberManagement {
1942    const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
1943
1944    fn host_address_to_uri(addr: HostAddress) -> http::Uri {
1945        format!("http://{}:{}", addr.host, addr.port)
1946            .parse()
1947            .unwrap()
1948    }
1949
1950    async fn recreate_core(&self, channel: Channel) {
1951        let mut core = self.core_ref.write().await;
1952        *core = GrpcMetaClientCore::new(channel);
1953    }
1954
1955    async fn refresh_members(&mut self) -> Result<()> {
1956        let leader_addr = match self.members.as_mut() {
1957            Either::Left(client) => {
1958                let resp = client
1959                    .to_owned()
1960                    .members(MembersRequest {})
1961                    .await
1962                    .map_err(RpcError::from_meta_status)?;
1963                let resp = resp.into_inner();
1964                resp.members.into_iter().find(|member| member.is_leader)
1965            }
1966            Either::Right(member_group) => {
1967                let mut fetched_members = None;
1968
1969                for (addr, client) in &mut member_group.members {
1970                    let members: Result<_> = try {
1971                        let mut client = match client {
1972                            Some(cached_client) => cached_client.to_owned(),
1973                            None => {
1974                                let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
1975                                let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
1976                                    .await
1977                                    .context("failed to create client")?;
1978                                let new_client: MetaMemberClient =
1979                                    MetaMemberServiceClient::new(channel);
1980                                *client = Some(new_client.clone());
1981
1982                                new_client
1983                            }
1984                        };
1985
1986                        let resp = client
1987                            .members(MembersRequest {})
1988                            .await
1989                            .context("failed to fetch members")?;
1990
1991                        resp.into_inner().members
1992                    };
1993
1994                    let fetched = members.is_ok();
1995                    fetched_members = Some(members);
1996                    if fetched {
1997                        break;
1998                    }
1999                }
2000
2001                let members = fetched_members
2002                    .context("no member available in the list")?
2003                    .context("could not refresh members")?;
2004
2005                // find new leader
2006                let mut leader = None;
2007                for member in members {
2008                    if member.is_leader {
2009                        leader = Some(member.clone());
2010                    }
2011
2012                    let addr = Self::host_address_to_uri(member.address.unwrap());
2013                    // We don't clean any expired addrs here to deal with some extreme situations.
2014                    if !member_group.members.contains(&addr) {
2015                        tracing::info!("new meta member joined: {}", addr);
2016                        member_group.members.put(addr, None);
2017                    }
2018                }
2019
2020                leader
2021            }
2022        };
2023
2024        if let Some(leader) = leader_addr {
2025            let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
2026
2027            if discovered_leader != self.current_leader {
2028                tracing::info!("new meta leader {} discovered", discovered_leader);
2029
2030                let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
2031                    Duration::from_secs(self.meta_config.meta_leader_lease_secs),
2032                    false,
2033                );
2034
2035                let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
2036                    let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
2037                    GrpcMetaClient::connect_to_endpoint(endpoint).await
2038                })
2039                .await?;
2040
2041                self.recreate_core(channel).await;
2042                self.current_leader = discovered_leader;
2043            }
2044        }
2045
2046        Ok(())
2047    }
2048}
2049
2050impl GrpcMetaClient {
2051    // See `Endpoint::http2_keep_alive_interval`
2052    const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
2053    // See `Endpoint::keep_alive_timeout`
2054    const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
2055    // Retry base interval in ms for connecting to meta server.
2056    const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
2057    // Max retry times for connecting to meta server.
2058    const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
2059
2060    fn start_meta_member_monitor(
2061        &self,
2062        init_leader_addr: http::Uri,
2063        members: Either<MetaMemberClient, MetaMemberGroup>,
2064        force_refresh_receiver: Receiver<Sender<Result<()>>>,
2065        meta_config: MetaConfig,
2066    ) -> Result<()> {
2067        let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
2068        let current_leader = init_leader_addr;
2069
2070        let enable_period_tick = matches!(members, Either::Right(_));
2071
2072        let member_management = MetaMemberManagement {
2073            core_ref,
2074            members,
2075            current_leader,
2076            meta_config,
2077        };
2078
2079        let mut force_refresh_receiver = force_refresh_receiver;
2080
2081        tokio::spawn(async move {
2082            let mut member_management = member_management;
2083            let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
2084
2085            loop {
2086                let event: Option<Sender<Result<()>>> = if enable_period_tick {
2087                    tokio::select! {
2088                        _ = ticker.tick() => None,
2089                        result_sender = force_refresh_receiver.recv() => {
2090                            if result_sender.is_none() {
2091                                break;
2092                            }
2093
2094                            result_sender
2095                        },
2096                    }
2097                } else {
2098                    let result_sender = force_refresh_receiver.recv().await;
2099
2100                    if result_sender.is_none() {
2101                        break;
2102                    }
2103
2104                    result_sender
2105                };
2106
2107                let tick_result = member_management.refresh_members().await;
2108                if let Err(e) = tick_result.as_ref() {
2109                    tracing::warn!(error = %e.as_report(),  "refresh meta member client failed");
2110                }
2111
2112                if let Some(sender) = event {
2113                    // ignore resp
2114                    let _resp = sender.send(tick_result);
2115                }
2116            }
2117        });
2118
2119        Ok(())
2120    }
2121
2122    async fn force_refresh_leader(&self) -> Result<()> {
2123        let (sender, receiver) = oneshot::channel();
2124
2125        self.member_monitor_event_sender
2126            .send(sender)
2127            .await
2128            .map_err(|e| anyhow!(e))?;
2129
2130        receiver.await.map_err(|e| anyhow!(e))?
2131    }
2132
2133    /// Connect to the meta server from `addrs`.
2134    pub async fn new(strategy: &MetaAddressStrategy, config: MetaConfig) -> Result<Self> {
2135        let (channel, addr) = match strategy {
2136            MetaAddressStrategy::LoadBalance(addr) => {
2137                Self::try_build_rpc_channel(vec![addr.clone()]).await
2138            }
2139            MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
2140        }?;
2141        let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
2142        let client = GrpcMetaClient {
2143            member_monitor_event_sender: force_refresh_sender,
2144            core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
2145        };
2146
2147        let meta_member_client = client.core.read().await.meta_member_client.clone();
2148        let members = match strategy {
2149            MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
2150            MetaAddressStrategy::List(addrs) => {
2151                let mut members = LruCache::new(20);
2152                for addr in addrs {
2153                    members.put(addr.clone(), None);
2154                }
2155                members.put(addr.clone(), Some(meta_member_client));
2156
2157                Either::Right(MetaMemberGroup { members })
2158            }
2159        };
2160
2161        client.start_meta_member_monitor(addr, members, force_refresh_receiver, config)?;
2162
2163        client.force_refresh_leader().await?;
2164
2165        Ok(client)
2166    }
2167
2168    fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2169        Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2170    }
2171
2172    pub(crate) async fn try_build_rpc_channel(
2173        addrs: impl IntoIterator<Item = http::Uri>,
2174    ) -> Result<(Channel, http::Uri)> {
2175        let endpoints: Vec<_> = addrs
2176            .into_iter()
2177            .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2178            .collect();
2179
2180        let mut last_error = None;
2181
2182        for (endpoint, addr) in endpoints {
2183            match Self::connect_to_endpoint(endpoint).await {
2184                Ok(channel) => {
2185                    tracing::info!("Connect to meta server {} successfully", addr);
2186                    return Ok((channel, addr));
2187                }
2188                Err(e) => {
2189                    tracing::warn!(
2190                        error = %e.as_report(),
2191                        "Failed to connect to meta server {}, trying again",
2192                        addr,
2193                    );
2194                    last_error = Some(e);
2195                }
2196            }
2197        }
2198
2199        if let Some(last_error) = last_error {
2200            Err(anyhow::anyhow!(last_error)
2201                .context("failed to connect to all meta servers")
2202                .into())
2203        } else {
2204            bail!("no meta server address provided")
2205        }
2206    }
2207
2208    async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2209        let channel = endpoint
2210            .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2211            .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2212            .connect_timeout(Duration::from_secs(5))
2213            .monitored_connect("grpc-meta-client", Default::default())
2214            .await?
2215            .wrapped();
2216
2217        Ok(channel)
2218    }
2219
2220    pub(crate) fn retry_strategy_to_bound(
2221        high_bound: Duration,
2222        exceed: bool,
2223    ) -> impl Iterator<Item = Duration> {
2224        let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2225            .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2226            .map(jitter);
2227
2228        let mut sum = Duration::default();
2229
2230        iter.take_while(move |duration| {
2231            sum += *duration;
2232
2233            if exceed {
2234                sum < high_bound + *duration
2235            } else {
2236                sum < high_bound
2237            }
2238        })
2239    }
2240}
2241
2242macro_rules! for_all_meta_rpc {
2243    ($macro:ident) => {
2244        $macro! {
2245             { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2246            ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2247            ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2248            ,{ cluster_client, update_worker_node_schedulability, UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse }
2249            ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2250            ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2251            ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2252            ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2253            ,{ stream_client, flush, FlushRequest, FlushResponse }
2254            ,{ stream_client, pause, PauseRequest, PauseResponse }
2255            ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2256            ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2257            ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2258            ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2259            ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2260            ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2261            ,{ stream_client, list_creating_stream_scan_fragment_distribution, ListCreatingStreamScanFragmentDistributionRequest, ListCreatingStreamScanFragmentDistributionResponse }
2262            ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2263            ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2264            ,{ stream_client, list_object_dependencies, ListObjectDependenciesRequest, ListObjectDependenciesResponse }
2265            ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2266            ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2267            ,{ stream_client, alter_connector_props, AlterConnectorPropsRequest, AlterConnectorPropsResponse }
2268            ,{ stream_client, get_fragment_by_id, GetFragmentByIdRequest, GetFragmentByIdResponse }
2269            ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2270            ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2271            ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2272            ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2273            ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2274            ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2275            ,{ ddl_client, alter_database_param, AlterDatabaseParamRequest, AlterDatabaseParamResponse }
2276            ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2277            ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2278            ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2279            ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2280            ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2281            ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2282            ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2283            ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2284            ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2285            ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2286            ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2287            ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2288            ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2289            ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2290            ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2291            ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2292            ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2293            ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2294            ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2295            ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2296            ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2297            ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2298            ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2299            ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2300            ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2301            ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2302            ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2303            ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2304            ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2305            ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2306            ,{ ddl_client, wait, WaitRequest, WaitResponse }
2307            ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2308            ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2309            ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2310            ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2311            ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2312            ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2313            ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2314            ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2315            ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2316            ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2317            ,{ hummock_client, get_new_object_ids, GetNewObjectIdsRequest, GetNewObjectIdsResponse }
2318            ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2319            ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2320            ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2321            ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2322            ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2323            ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2324            ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2325            ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2326            ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2327            ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2328            ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2329            ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2330            ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2331            ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2332            ,{ hummock_client, subscribe_iceberg_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeIcebergCompactionEventRequest>, Streaming<SubscribeIcebergCompactionEventResponse> }
2333            ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2334            ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2335            ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2336            ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2337            ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2338            ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2339            ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2340            ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2341            ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2342            ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2343            ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2344            ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2345            ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2346            ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2347            ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2348            ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2349            ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2350            ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2351            ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2352            ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2353            ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2354            ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2355            ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2356            ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2357            ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2358            ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2359            ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2360            ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2361            ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2362            ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2363            ,{ hosted_iceberg_catalog_service_client, list_iceberg_tables, ListIcebergTablesRequest, ListIcebergTablesResponse }
2364            ,{ monitor_client, stack_trace, StackTraceRequest, StackTraceResponse }
2365        }
2366    };
2367}
2368
2369impl GrpcMetaClient {
2370    async fn refresh_client_if_needed(&self, code: Code) {
2371        if matches!(
2372            code,
2373            Code::Unknown | Code::Unimplemented | Code::Unavailable
2374        ) {
2375            tracing::debug!("matching tonic code {}", code);
2376            let (result_sender, result_receiver) = oneshot::channel();
2377            if self
2378                .member_monitor_event_sender
2379                .try_send(result_sender)
2380                .is_ok()
2381            {
2382                if let Ok(Err(e)) = result_receiver.await {
2383                    tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2384                }
2385            } else {
2386                tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2387            }
2388        }
2389    }
2390}
2391
2392impl GrpcMetaClient {
2393    for_all_meta_rpc! { meta_rpc_client_method_impl }
2394}