risingwave_rpc_client/
meta_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{
33    AlterDatabaseParam, FunctionId, IndexId, ObjectId, SecretId, TableId,
34};
35use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
36use risingwave_common::hash::WorkerSlotMapping;
37use risingwave_common::monitor::EndpointExt;
38use risingwave_common::system_param::reader::SystemParamsReader;
39use risingwave_common::telemetry::report::TelemetryInfoFetcher;
40use risingwave_common::util::addr::HostAddr;
41use risingwave_common::util::meta_addr::MetaAddressStrategy;
42use risingwave_common::util::resource_util::cpu::total_cpu_available;
43use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
44use risingwave_error::bail;
45use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
46use risingwave_hummock_sdk::compaction_group::StateTableId;
47use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
48use risingwave_hummock_sdk::{
49    CompactionGroupId, HummockEpoch, HummockVersionId, ObjectIdRange, SyncResult,
50};
51use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
52use risingwave_pb::backup_service::*;
53use risingwave_pb::catalog::{
54    Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
55    PbSubscription, PbTable, PbView, Table,
56};
57use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
58use risingwave_pb::cloud_service::*;
59use risingwave_pb::common::worker_node::Property;
60use risingwave_pb::common::{HostAddress, OptionalUint32, OptionalUint64, WorkerNode, WorkerType};
61use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
62use risingwave_pb::ddl_service::alter_owner_request::Object;
63use risingwave_pb::ddl_service::create_materialized_view_request::PbBackfillType;
64use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
65use risingwave_pb::ddl_service::drop_table_request::SourceId;
66use risingwave_pb::ddl_service::*;
67use risingwave_pb::hummock::compact_task::TaskStatus;
68use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
69use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
70use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
71use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
72use risingwave_pb::hummock::write_limits::WriteLimit;
73use risingwave_pb::hummock::*;
74use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Register as IcebergRegister;
75use risingwave_pb::iceberg_compaction::{
76    SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
77    subscribe_iceberg_compaction_event_request,
78};
79use risingwave_pb::meta::alter_connector_props_request::AlterConnectorPropsObject;
80use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
81use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
82use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
83use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
84use risingwave_pb::meta::hosted_iceberg_catalog_service_client::HostedIcebergCatalogServiceClient;
85use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
86use risingwave_pb::meta::list_actor_states_response::ActorState;
87use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
88use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
89use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
90use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
91use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
92use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
93use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
94use risingwave_pb::meta::serving_service_client::ServingServiceClient;
95use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
96use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
97use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
98use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
99use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
100use risingwave_pb::meta::{FragmentDistribution, *};
101use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
102use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
103use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
104use risingwave_pb::secret::PbSecretRef;
105use risingwave_pb::stream_plan::StreamFragmentGraph;
106use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
107use risingwave_pb::user::update_user_request::UpdateField;
108use risingwave_pb::user::user_service_client::UserServiceClient;
109use risingwave_pb::user::*;
110use thiserror_ext::AsReport;
111use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
112use tokio::sync::oneshot::Sender;
113use tokio::sync::{RwLock, mpsc, oneshot};
114use tokio::task::JoinHandle;
115use tokio::time::{self};
116use tokio_retry::strategy::{ExponentialBackoff, jitter};
117use tokio_stream::wrappers::UnboundedReceiverStream;
118use tonic::transport::Endpoint;
119use tonic::{Code, Request, Streaming};
120
121use crate::channel::{Channel, WrappedChannelExt};
122use crate::error::{Result, RpcError};
123use crate::hummock_meta_client::{
124    CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
125    IcebergCompactionEventItem,
126};
127use crate::meta_rpc_client_method_impl;
128
129type ConnectionId = u32;
130type DatabaseId = u32;
131type SchemaId = u32;
132
133/// Client to meta server. Cloning the instance is lightweight.
134#[derive(Clone, Debug)]
135pub struct MetaClient {
136    worker_id: u32,
137    worker_type: WorkerType,
138    host_addr: HostAddr,
139    inner: GrpcMetaClient,
140    meta_config: MetaConfig,
141    cluster_id: String,
142    shutting_down: Arc<AtomicBool>,
143}
144
145impl MetaClient {
146    pub fn worker_id(&self) -> u32 {
147        self.worker_id
148    }
149
150    pub fn host_addr(&self) -> &HostAddr {
151        &self.host_addr
152    }
153
154    pub fn worker_type(&self) -> WorkerType {
155        self.worker_type
156    }
157
158    pub fn cluster_id(&self) -> &str {
159        &self.cluster_id
160    }
161
162    /// Subscribe to notification from meta.
163    pub async fn subscribe(
164        &self,
165        subscribe_type: SubscribeType,
166    ) -> Result<Streaming<SubscribeResponse>> {
167        let request = SubscribeRequest {
168            subscribe_type: subscribe_type as i32,
169            host: Some(self.host_addr.to_protobuf()),
170            worker_id: self.worker_id(),
171        };
172
173        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
174            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
175            true,
176        );
177
178        tokio_retry::Retry::spawn(retry_strategy, || async {
179            let request = request.clone();
180            self.inner.subscribe(request).await
181        })
182        .await
183    }
184
185    pub async fn create_connection(
186        &self,
187        connection_name: String,
188        database_id: u32,
189        schema_id: u32,
190        owner_id: u32,
191        req: create_connection_request::Payload,
192    ) -> Result<WaitVersion> {
193        let request = CreateConnectionRequest {
194            name: connection_name,
195            database_id,
196            schema_id,
197            owner_id,
198            payload: Some(req),
199        };
200        let resp = self.inner.create_connection(request).await?;
201        Ok(resp
202            .version
203            .ok_or_else(|| anyhow!("wait version not set"))?)
204    }
205
206    pub async fn create_secret(
207        &self,
208        secret_name: String,
209        database_id: u32,
210        schema_id: u32,
211        owner_id: u32,
212        value: Vec<u8>,
213    ) -> Result<WaitVersion> {
214        let request = CreateSecretRequest {
215            name: secret_name,
216            database_id,
217            schema_id,
218            owner_id,
219            value,
220        };
221        let resp = self.inner.create_secret(request).await?;
222        Ok(resp
223            .version
224            .ok_or_else(|| anyhow!("wait version not set"))?)
225    }
226
227    pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
228        let request = ListConnectionsRequest {};
229        let resp = self.inner.list_connections(request).await?;
230        Ok(resp.connections)
231    }
232
233    pub async fn drop_connection(&self, connection_id: ConnectionId) -> Result<WaitVersion> {
234        let request = DropConnectionRequest { connection_id };
235        let resp = self.inner.drop_connection(request).await?;
236        Ok(resp
237            .version
238            .ok_or_else(|| anyhow!("wait version not set"))?)
239    }
240
241    pub async fn drop_secret(&self, secret_id: SecretId) -> Result<WaitVersion> {
242        let request = DropSecretRequest {
243            secret_id: secret_id.into(),
244        };
245        let resp = self.inner.drop_secret(request).await?;
246        Ok(resp
247            .version
248            .ok_or_else(|| anyhow!("wait version not set"))?)
249    }
250
251    /// Register the current node to the cluster and set the corresponding worker id.
252    ///
253    /// Retry if there's connection issue with the meta node. Exit the process if the registration fails.
254    pub async fn register_new(
255        addr_strategy: MetaAddressStrategy,
256        worker_type: WorkerType,
257        addr: &HostAddr,
258        property: Property,
259        meta_config: &MetaConfig,
260    ) -> (Self, SystemParamsReader) {
261        let ret =
262            Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
263
264        match ret {
265            Ok(ret) => ret,
266            Err(err) => {
267                tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
268                std::process::exit(1);
269            }
270        }
271    }
272
273    async fn register_new_inner(
274        addr_strategy: MetaAddressStrategy,
275        worker_type: WorkerType,
276        addr: &HostAddr,
277        property: Property,
278        meta_config: &MetaConfig,
279    ) -> Result<(Self, SystemParamsReader)> {
280        tracing::info!("register meta client using strategy: {}", addr_strategy);
281
282        // Retry until reaching `max_heartbeat_interval_secs`
283        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
284            Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
285            true,
286        );
287
288        if property.is_unschedulable {
289            tracing::warn!("worker {:?} registered as unschedulable", addr.clone());
290        }
291        let init_result: Result<_> = tokio_retry::RetryIf::spawn(
292            retry_strategy,
293            || async {
294                let grpc_meta_client =
295                    GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
296
297                let add_worker_resp = grpc_meta_client
298                    .add_worker_node(AddWorkerNodeRequest {
299                        worker_type: worker_type as i32,
300                        host: Some(addr.to_protobuf()),
301                        property: Some(property.clone()),
302                        resource: Some(risingwave_pb::common::worker_node::Resource {
303                            rw_version: RW_VERSION.to_owned(),
304                            total_memory_bytes: system_memory_available_bytes() as _,
305                            total_cpu_cores: total_cpu_available() as _,
306                        }),
307                    })
308                    .await
309                    .context("failed to add worker node")?;
310
311                let system_params_resp = grpc_meta_client
312                    .get_system_params(GetSystemParamsRequest {})
313                    .await
314                    .context("failed to get initial system params")?;
315
316                Ok((add_worker_resp, system_params_resp, grpc_meta_client))
317            },
318            // Only retry if there's any transient connection issue.
319            // If the error is from our implementation or business, do not retry it.
320            |e: &RpcError| !e.is_from_tonic_server_impl(),
321        )
322        .await;
323
324        let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
325        let worker_id = add_worker_resp
326            .node_id
327            .expect("AddWorkerNodeResponse::node_id is empty");
328
329        let meta_client = Self {
330            worker_id,
331            worker_type,
332            host_addr: addr.clone(),
333            inner: grpc_meta_client,
334            meta_config: meta_config.to_owned(),
335            cluster_id: add_worker_resp.cluster_id,
336            shutting_down: Arc::new(false.into()),
337        };
338
339        static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
340        REPORT_PANIC.call_once(|| {
341            let meta_client_clone = meta_client.clone();
342            std::panic::update_hook(move |default_hook, info| {
343                // Try to report panic event to meta node.
344                meta_client_clone.try_add_panic_event_blocking(info, None);
345                default_hook(info);
346            });
347        });
348
349        Ok((meta_client, system_params_resp.params.unwrap().into()))
350    }
351
352    /// Activate the current node in cluster to confirm it's ready to serve.
353    pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
354        let request = ActivateWorkerNodeRequest {
355            host: Some(addr.to_protobuf()),
356            node_id: self.worker_id,
357        };
358        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
359            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
360            true,
361        );
362        tokio_retry::Retry::spawn(retry_strategy, || async {
363            let request = request.clone();
364            self.inner.activate_worker_node(request).await
365        })
366        .await?;
367
368        Ok(())
369    }
370
371    /// Send heartbeat signal to meta service.
372    pub async fn send_heartbeat(&self, node_id: u32) -> Result<()> {
373        let request = HeartbeatRequest { node_id };
374        let resp = self.inner.heartbeat(request).await?;
375        if let Some(status) = resp.status
376            && status.code() == risingwave_pb::common::status::Code::UnknownWorker
377        {
378            // Ignore the error if we're already shutting down.
379            // Otherwise, exit the process.
380            if !self.shutting_down.load(Relaxed) {
381                tracing::error!(message = status.message, "worker expired");
382                std::process::exit(1);
383            }
384        }
385        Ok(())
386    }
387
388    pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
389        let request = CreateDatabaseRequest { db: Some(db) };
390        let resp = self.inner.create_database(request).await?;
391        // TODO: handle error in `resp.status` here
392        Ok(resp
393            .version
394            .ok_or_else(|| anyhow!("wait version not set"))?)
395    }
396
397    pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
398        let request = CreateSchemaRequest {
399            schema: Some(schema),
400        };
401        let resp = self.inner.create_schema(request).await?;
402        // TODO: handle error in `resp.status` here
403        Ok(resp
404            .version
405            .ok_or_else(|| anyhow!("wait version not set"))?)
406    }
407
408    pub async fn create_materialized_view(
409        &self,
410        table: PbTable,
411        graph: StreamFragmentGraph,
412        dependencies: HashSet<ObjectId>,
413        specific_resource_group: Option<String>,
414        if_not_exists: bool,
415    ) -> Result<WaitVersion> {
416        let request = CreateMaterializedViewRequest {
417            materialized_view: Some(table),
418            fragment_graph: Some(graph),
419            backfill: PbBackfillType::Regular as _,
420            dependencies: dependencies.into_iter().collect(),
421            specific_resource_group,
422            if_not_exists,
423        };
424        let resp = self.inner.create_materialized_view(request).await?;
425        // TODO: handle error in `resp.status` here
426        Ok(resp
427            .version
428            .ok_or_else(|| anyhow!("wait version not set"))?)
429    }
430
431    pub async fn drop_materialized_view(
432        &self,
433        table_id: TableId,
434        cascade: bool,
435    ) -> Result<WaitVersion> {
436        let request = DropMaterializedViewRequest {
437            table_id: table_id.table_id(),
438            cascade,
439        };
440
441        let resp = self.inner.drop_materialized_view(request).await?;
442        Ok(resp
443            .version
444            .ok_or_else(|| anyhow!("wait version not set"))?)
445    }
446
447    pub async fn create_source(
448        &self,
449        source: PbSource,
450        graph: Option<StreamFragmentGraph>,
451        if_not_exists: bool,
452    ) -> Result<WaitVersion> {
453        let request = CreateSourceRequest {
454            source: Some(source),
455            fragment_graph: graph,
456            if_not_exists,
457        };
458
459        let resp = self.inner.create_source(request).await?;
460        Ok(resp
461            .version
462            .ok_or_else(|| anyhow!("wait version not set"))?)
463    }
464
465    pub async fn create_sink(
466        &self,
467        sink: PbSink,
468        graph: StreamFragmentGraph,
469        affected_table_change: Option<ReplaceJobPlan>,
470        dependencies: HashSet<ObjectId>,
471        if_not_exists: bool,
472    ) -> Result<WaitVersion> {
473        let request = CreateSinkRequest {
474            sink: Some(sink),
475            fragment_graph: Some(graph),
476            affected_table_change,
477            dependencies: dependencies.into_iter().collect(),
478            if_not_exists,
479        };
480
481        let resp = self.inner.create_sink(request).await?;
482        Ok(resp
483            .version
484            .ok_or_else(|| anyhow!("wait version not set"))?)
485    }
486
487    pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
488        let request = CreateSubscriptionRequest {
489            subscription: Some(subscription),
490        };
491
492        let resp = self.inner.create_subscription(request).await?;
493        Ok(resp
494            .version
495            .ok_or_else(|| anyhow!("wait version not set"))?)
496    }
497
498    pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
499        let request = CreateFunctionRequest {
500            function: Some(function),
501        };
502        let resp = self.inner.create_function(request).await?;
503        Ok(resp
504            .version
505            .ok_or_else(|| anyhow!("wait version not set"))?)
506    }
507
508    pub async fn create_table(
509        &self,
510        source: Option<PbSource>,
511        table: PbTable,
512        graph: StreamFragmentGraph,
513        job_type: PbTableJobType,
514        if_not_exists: bool,
515        dependencies: HashSet<ObjectId>,
516    ) -> Result<WaitVersion> {
517        let request = CreateTableRequest {
518            materialized_view: Some(table),
519            fragment_graph: Some(graph),
520            source,
521            job_type: job_type as _,
522            if_not_exists,
523            dependencies: dependencies.into_iter().collect(),
524        };
525        let resp = self.inner.create_table(request).await?;
526        // TODO: handle error in `resp.status` here
527        Ok(resp
528            .version
529            .ok_or_else(|| anyhow!("wait version not set"))?)
530    }
531
532    pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
533        let request = CommentOnRequest {
534            comment: Some(comment),
535        };
536        let resp = self.inner.comment_on(request).await?;
537        Ok(resp
538            .version
539            .ok_or_else(|| anyhow!("wait version not set"))?)
540    }
541
542    pub async fn alter_name(
543        &self,
544        object: alter_name_request::Object,
545        name: &str,
546    ) -> Result<WaitVersion> {
547        let request = AlterNameRequest {
548            object: Some(object),
549            new_name: name.to_owned(),
550        };
551        let resp = self.inner.alter_name(request).await?;
552        Ok(resp
553            .version
554            .ok_or_else(|| anyhow!("wait version not set"))?)
555    }
556
557    pub async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<WaitVersion> {
558        let request = AlterOwnerRequest {
559            object: Some(object),
560            owner_id,
561        };
562        let resp = self.inner.alter_owner(request).await?;
563        Ok(resp
564            .version
565            .ok_or_else(|| anyhow!("wait version not set"))?)
566    }
567
568    pub async fn alter_database_param(
569        &self,
570        database_id: DatabaseId,
571        param: AlterDatabaseParam,
572    ) -> Result<WaitVersion> {
573        let request = match param {
574            AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms) => {
575                let barrier_interval_ms = OptionalUint32 {
576                    value: barrier_interval_ms,
577                };
578                AlterDatabaseParamRequest {
579                    database_id,
580                    param: Some(alter_database_param_request::Param::BarrierIntervalMs(
581                        barrier_interval_ms,
582                    )),
583                }
584            }
585            AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency) => {
586                let checkpoint_frequency = OptionalUint64 {
587                    value: checkpoint_frequency,
588                };
589                AlterDatabaseParamRequest {
590                    database_id,
591                    param: Some(alter_database_param_request::Param::CheckpointFrequency(
592                        checkpoint_frequency,
593                    )),
594                }
595            }
596        };
597        let resp = self.inner.alter_database_param(request).await?;
598        Ok(resp
599            .version
600            .ok_or_else(|| anyhow!("wait version not set"))?)
601    }
602
603    pub async fn alter_set_schema(
604        &self,
605        object: alter_set_schema_request::Object,
606        new_schema_id: u32,
607    ) -> Result<WaitVersion> {
608        let request = AlterSetSchemaRequest {
609            new_schema_id,
610            object: Some(object),
611        };
612        let resp = self.inner.alter_set_schema(request).await?;
613        Ok(resp
614            .version
615            .ok_or_else(|| anyhow!("wait version not set"))?)
616    }
617
618    pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
619        let request = AlterSourceRequest {
620            source: Some(source),
621        };
622        let resp = self.inner.alter_source(request).await?;
623        Ok(resp
624            .version
625            .ok_or_else(|| anyhow!("wait version not set"))?)
626    }
627
628    pub async fn alter_parallelism(
629        &self,
630        table_id: u32,
631        parallelism: PbTableParallelism,
632        deferred: bool,
633    ) -> Result<()> {
634        let request = AlterParallelismRequest {
635            table_id,
636            parallelism: Some(parallelism),
637            deferred,
638        };
639
640        self.inner.alter_parallelism(request).await?;
641        Ok(())
642    }
643
644    pub async fn alter_resource_group(
645        &self,
646        table_id: u32,
647        resource_group: Option<String>,
648        deferred: bool,
649    ) -> Result<()> {
650        let request = AlterResourceGroupRequest {
651            table_id,
652            resource_group,
653            deferred,
654        };
655
656        self.inner.alter_resource_group(request).await?;
657        Ok(())
658    }
659
660    pub async fn alter_swap_rename(
661        &self,
662        object: alter_swap_rename_request::Object,
663    ) -> Result<WaitVersion> {
664        let request = AlterSwapRenameRequest {
665            object: Some(object),
666        };
667        let resp = self.inner.alter_swap_rename(request).await?;
668        Ok(resp
669            .version
670            .ok_or_else(|| anyhow!("wait version not set"))?)
671    }
672
673    pub async fn alter_secret(
674        &self,
675        secret_id: u32,
676        secret_name: String,
677        database_id: u32,
678        schema_id: u32,
679        owner_id: u32,
680        value: Vec<u8>,
681    ) -> Result<WaitVersion> {
682        let request = AlterSecretRequest {
683            secret_id,
684            name: secret_name,
685            database_id,
686            schema_id,
687            owner_id,
688            value,
689        };
690        let resp = self.inner.alter_secret(request).await?;
691        Ok(resp
692            .version
693            .ok_or_else(|| anyhow!("wait version not set"))?)
694    }
695
696    pub async fn replace_job(
697        &self,
698        graph: StreamFragmentGraph,
699        replace_job: ReplaceJob,
700    ) -> Result<WaitVersion> {
701        let request = ReplaceJobPlanRequest {
702            plan: Some(ReplaceJobPlan {
703                fragment_graph: Some(graph),
704                replace_job: Some(replace_job),
705            }),
706        };
707        let resp = self.inner.replace_job_plan(request).await?;
708        // TODO: handle error in `resp.status` here
709        Ok(resp
710            .version
711            .ok_or_else(|| anyhow!("wait version not set"))?)
712    }
713
714    pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
715        let request = AutoSchemaChangeRequest {
716            schema_change: Some(schema_change),
717        };
718        let _ = self.inner.auto_schema_change(request).await?;
719        Ok(())
720    }
721
722    pub async fn create_view(
723        &self,
724        view: PbView,
725        dependencies: HashSet<ObjectId>,
726    ) -> Result<WaitVersion> {
727        let request = CreateViewRequest {
728            view: Some(view),
729            dependencies: dependencies.into_iter().collect(),
730        };
731        let resp = self.inner.create_view(request).await?;
732        // TODO: handle error in `resp.status` here
733        Ok(resp
734            .version
735            .ok_or_else(|| anyhow!("wait version not set"))?)
736    }
737
738    pub async fn create_index(
739        &self,
740        index: PbIndex,
741        table: PbTable,
742        graph: StreamFragmentGraph,
743        if_not_exists: bool,
744    ) -> Result<WaitVersion> {
745        let request = CreateIndexRequest {
746            index: Some(index),
747            index_table: Some(table),
748            fragment_graph: Some(graph),
749            if_not_exists,
750        };
751        let resp = self.inner.create_index(request).await?;
752        // TODO: handle error in `resp.status` here
753        Ok(resp
754            .version
755            .ok_or_else(|| anyhow!("wait version not set"))?)
756    }
757
758    pub async fn drop_table(
759        &self,
760        source_id: Option<u32>,
761        table_id: TableId,
762        cascade: bool,
763    ) -> Result<WaitVersion> {
764        let request = DropTableRequest {
765            source_id: source_id.map(SourceId::Id),
766            table_id: table_id.table_id(),
767            cascade,
768        };
769
770        let resp = self.inner.drop_table(request).await?;
771        Ok(resp
772            .version
773            .ok_or_else(|| anyhow!("wait version not set"))?)
774    }
775
776    pub async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<WaitVersion> {
777        let request = DropViewRequest { view_id, cascade };
778        let resp = self.inner.drop_view(request).await?;
779        Ok(resp
780            .version
781            .ok_or_else(|| anyhow!("wait version not set"))?)
782    }
783
784    pub async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<WaitVersion> {
785        let request = DropSourceRequest { source_id, cascade };
786        let resp = self.inner.drop_source(request).await?;
787        Ok(resp
788            .version
789            .ok_or_else(|| anyhow!("wait version not set"))?)
790    }
791
792    pub async fn drop_sink(
793        &self,
794        sink_id: u32,
795        cascade: bool,
796        affected_table_change: Option<ReplaceJobPlan>,
797    ) -> Result<WaitVersion> {
798        let request = DropSinkRequest {
799            sink_id,
800            cascade,
801            affected_table_change,
802        };
803        let resp = self.inner.drop_sink(request).await?;
804        Ok(resp
805            .version
806            .ok_or_else(|| anyhow!("wait version not set"))?)
807    }
808
809    pub async fn drop_subscription(
810        &self,
811        subscription_id: u32,
812        cascade: bool,
813    ) -> Result<WaitVersion> {
814        let request = DropSubscriptionRequest {
815            subscription_id,
816            cascade,
817        };
818        let resp = self.inner.drop_subscription(request).await?;
819        Ok(resp
820            .version
821            .ok_or_else(|| anyhow!("wait version not set"))?)
822    }
823
824    pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
825        let request = DropIndexRequest {
826            index_id: index_id.index_id,
827            cascade,
828        };
829        let resp = self.inner.drop_index(request).await?;
830        Ok(resp
831            .version
832            .ok_or_else(|| anyhow!("wait version not set"))?)
833    }
834
835    pub async fn drop_function(&self, function_id: FunctionId) -> Result<WaitVersion> {
836        let request = DropFunctionRequest {
837            function_id: function_id.0,
838        };
839        let resp = self.inner.drop_function(request).await?;
840        Ok(resp
841            .version
842            .ok_or_else(|| anyhow!("wait version not set"))?)
843    }
844
845    pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
846        let request = DropDatabaseRequest { database_id };
847        let resp = self.inner.drop_database(request).await?;
848        Ok(resp
849            .version
850            .ok_or_else(|| anyhow!("wait version not set"))?)
851    }
852
853    pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
854        let request = DropSchemaRequest { schema_id, cascade };
855        let resp = self.inner.drop_schema(request).await?;
856        Ok(resp
857            .version
858            .ok_or_else(|| anyhow!("wait version not set"))?)
859    }
860
861    // TODO: using UserInfoVersion instead as return type.
862    pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
863        let request = CreateUserRequest { user: Some(user) };
864        let resp = self.inner.create_user(request).await?;
865        Ok(resp.version)
866    }
867
868    pub async fn drop_user(&self, user_id: u32) -> Result<u64> {
869        let request = DropUserRequest { user_id };
870        let resp = self.inner.drop_user(request).await?;
871        Ok(resp.version)
872    }
873
874    pub async fn update_user(
875        &self,
876        user: UserInfo,
877        update_fields: Vec<UpdateField>,
878    ) -> Result<u64> {
879        let request = UpdateUserRequest {
880            user: Some(user),
881            update_fields: update_fields
882                .into_iter()
883                .map(|field| field as i32)
884                .collect::<Vec<_>>(),
885        };
886        let resp = self.inner.update_user(request).await?;
887        Ok(resp.version)
888    }
889
890    pub async fn grant_privilege(
891        &self,
892        user_ids: Vec<u32>,
893        privileges: Vec<GrantPrivilege>,
894        with_grant_option: bool,
895        granted_by: u32,
896    ) -> Result<u64> {
897        let request = GrantPrivilegeRequest {
898            user_ids,
899            privileges,
900            with_grant_option,
901            granted_by,
902        };
903        let resp = self.inner.grant_privilege(request).await?;
904        Ok(resp.version)
905    }
906
907    pub async fn revoke_privilege(
908        &self,
909        user_ids: Vec<u32>,
910        privileges: Vec<GrantPrivilege>,
911        granted_by: u32,
912        revoke_by: u32,
913        revoke_grant_option: bool,
914        cascade: bool,
915    ) -> Result<u64> {
916        let request = RevokePrivilegeRequest {
917            user_ids,
918            privileges,
919            granted_by,
920            revoke_by,
921            revoke_grant_option,
922            cascade,
923        };
924        let resp = self.inner.revoke_privilege(request).await?;
925        Ok(resp.version)
926    }
927
928    pub async fn alter_default_privilege(
929        &self,
930        user_ids: Vec<u32>,
931        database_id: DatabaseId,
932        schema_ids: Vec<SchemaId>,
933        operation: AlterDefaultPrivilegeOperation,
934        granted_by: u32,
935    ) -> Result<()> {
936        let request = AlterDefaultPrivilegeRequest {
937            user_ids,
938            database_id,
939            schema_ids,
940            operation: Some(operation),
941            granted_by,
942        };
943        self.inner.alter_default_privilege(request).await?;
944        Ok(())
945    }
946
947    /// Unregister the current node from the cluster.
948    pub async fn unregister(&self) -> Result<()> {
949        let request = DeleteWorkerNodeRequest {
950            host: Some(self.host_addr.to_protobuf()),
951        };
952        self.inner.delete_worker_node(request).await?;
953        self.shutting_down.store(true, Relaxed);
954        Ok(())
955    }
956
957    /// Try to unregister the current worker from the cluster with best effort. Log the result.
958    pub async fn try_unregister(&self) {
959        match self.unregister().await {
960            Ok(_) => {
961                tracing::info!(
962                    worker_id = self.worker_id(),
963                    "successfully unregistered from meta service",
964                )
965            }
966            Err(e) => {
967                tracing::warn!(
968                    error = %e.as_report(),
969                    worker_id = self.worker_id(),
970                    "failed to unregister from meta service",
971                );
972            }
973        }
974    }
975
976    pub async fn update_schedulability(
977        &self,
978        worker_ids: &[u32],
979        schedulability: Schedulability,
980    ) -> Result<UpdateWorkerNodeSchedulabilityResponse> {
981        let request = UpdateWorkerNodeSchedulabilityRequest {
982            worker_ids: worker_ids.to_vec(),
983            schedulability: schedulability.into(),
984        };
985        let resp = self
986            .inner
987            .update_worker_node_schedulability(request)
988            .await?;
989        Ok(resp)
990    }
991
992    pub async fn list_worker_nodes(
993        &self,
994        worker_type: Option<WorkerType>,
995    ) -> Result<Vec<WorkerNode>> {
996        let request = ListAllNodesRequest {
997            worker_type: worker_type.map(Into::into),
998            include_starting_nodes: true,
999        };
1000        let resp = self.inner.list_all_nodes(request).await?;
1001        Ok(resp.nodes)
1002    }
1003
1004    /// Starts a heartbeat worker.
1005    pub fn start_heartbeat_loop(
1006        meta_client: MetaClient,
1007        min_interval: Duration,
1008    ) -> (JoinHandle<()>, Sender<()>) {
1009        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1010        let join_handle = tokio::spawn(async move {
1011            let mut min_interval_ticker = tokio::time::interval(min_interval);
1012            loop {
1013                tokio::select! {
1014                    biased;
1015                    // Shutdown
1016                    _ = &mut shutdown_rx => {
1017                        tracing::info!("Heartbeat loop is stopped");
1018                        return;
1019                    }
1020                    // Wait for interval
1021                    _ = min_interval_ticker.tick() => {},
1022                }
1023                tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
1024                match tokio::time::timeout(
1025                    // TODO: decide better min_interval for timeout
1026                    min_interval * 3,
1027                    meta_client.send_heartbeat(meta_client.worker_id()),
1028                )
1029                .await
1030                {
1031                    Ok(Ok(_)) => {}
1032                    Ok(Err(err)) => {
1033                        tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
1034                    }
1035                    Err(_) => {
1036                        tracing::warn!("Failed to send_heartbeat: timeout");
1037                    }
1038                }
1039            }
1040        });
1041        (join_handle, shutdown_tx)
1042    }
1043
1044    pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
1045        let request = RisectlListStateTablesRequest {};
1046        let resp = self.inner.risectl_list_state_tables(request).await?;
1047        Ok(resp.tables)
1048    }
1049
1050    pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
1051        let request = FlushRequest { database_id };
1052        let resp = self.inner.flush(request).await?;
1053        Ok(HummockVersionId::new(resp.hummock_version_id))
1054    }
1055
1056    pub async fn wait(&self) -> Result<()> {
1057        let request = WaitRequest {};
1058        self.inner.wait(request).await?;
1059        Ok(())
1060    }
1061
1062    pub async fn recover(&self) -> Result<()> {
1063        let request = RecoverRequest {};
1064        self.inner.recover(request).await?;
1065        Ok(())
1066    }
1067
1068    pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
1069        let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
1070        let resp = self.inner.cancel_creating_jobs(request).await?;
1071        Ok(resp.canceled_jobs)
1072    }
1073
1074    pub async fn list_table_fragments(
1075        &self,
1076        table_ids: &[u32],
1077    ) -> Result<HashMap<u32, TableFragmentInfo>> {
1078        let request = ListTableFragmentsRequest {
1079            table_ids: table_ids.to_vec(),
1080        };
1081        let resp = self.inner.list_table_fragments(request).await?;
1082        Ok(resp.table_fragments)
1083    }
1084
1085    pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1086        let resp = self
1087            .inner
1088            .list_streaming_job_states(ListStreamingJobStatesRequest {})
1089            .await?;
1090        Ok(resp.states)
1091    }
1092
1093    pub async fn list_fragment_distributions(&self) -> Result<Vec<FragmentDistribution>> {
1094        let resp = self
1095            .inner
1096            .list_fragment_distribution(ListFragmentDistributionRequest {})
1097            .await?;
1098        Ok(resp.distributions)
1099    }
1100
1101    pub async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
1102        let resp = self
1103            .inner
1104            .list_creating_fragment_distribution(ListCreatingFragmentDistributionRequest {})
1105            .await?;
1106        Ok(resp.distributions)
1107    }
1108
1109    pub async fn get_fragment_by_id(
1110        &self,
1111        fragment_id: u32,
1112    ) -> Result<Option<FragmentDistribution>> {
1113        let resp = self
1114            .inner
1115            .get_fragment_by_id(GetFragmentByIdRequest { fragment_id })
1116            .await?;
1117        Ok(resp.distribution)
1118    }
1119
1120    pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1121        let resp = self
1122            .inner
1123            .list_actor_states(ListActorStatesRequest {})
1124            .await?;
1125        Ok(resp.states)
1126    }
1127
1128    pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1129        let resp = self
1130            .inner
1131            .list_actor_splits(ListActorSplitsRequest {})
1132            .await?;
1133
1134        Ok(resp.actor_splits)
1135    }
1136
1137    pub async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
1138        let resp = self
1139            .inner
1140            .list_object_dependencies(ListObjectDependenciesRequest {})
1141            .await?;
1142        Ok(resp.dependencies)
1143    }
1144
1145    pub async fn pause(&self) -> Result<PauseResponse> {
1146        let request = PauseRequest {};
1147        let resp = self.inner.pause(request).await?;
1148        Ok(resp)
1149    }
1150
1151    pub async fn resume(&self) -> Result<ResumeResponse> {
1152        let request = ResumeRequest {};
1153        let resp = self.inner.resume(request).await?;
1154        Ok(resp)
1155    }
1156
1157    pub async fn apply_throttle(
1158        &self,
1159        kind: PbThrottleTarget,
1160        id: u32,
1161        rate: Option<u32>,
1162    ) -> Result<ApplyThrottleResponse> {
1163        let request = ApplyThrottleRequest {
1164            kind: kind as i32,
1165            id,
1166            rate,
1167        };
1168        let resp = self.inner.apply_throttle(request).await?;
1169        Ok(resp)
1170    }
1171
1172    pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1173        let resp = self
1174            .inner
1175            .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1176            .await?;
1177        Ok(resp.get_status().unwrap())
1178    }
1179
1180    pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1181        let request = GetClusterInfoRequest {};
1182        let resp = self.inner.get_cluster_info(request).await?;
1183        Ok(resp)
1184    }
1185
1186    pub async fn reschedule(
1187        &self,
1188        worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1189        revision: u64,
1190        resolve_no_shuffle_upstream: bool,
1191    ) -> Result<(bool, u64)> {
1192        let request = RescheduleRequest {
1193            revision,
1194            resolve_no_shuffle_upstream,
1195            worker_reschedules,
1196        };
1197        let resp = self.inner.reschedule(request).await?;
1198        Ok((resp.success, resp.revision))
1199    }
1200
1201    pub async fn risectl_get_pinned_versions_summary(
1202        &self,
1203    ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1204        let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1205        self.inner
1206            .rise_ctl_get_pinned_versions_summary(request)
1207            .await
1208    }
1209
1210    pub async fn risectl_get_checkpoint_hummock_version(
1211        &self,
1212    ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1213        let request = RiseCtlGetCheckpointVersionRequest {};
1214        self.inner.rise_ctl_get_checkpoint_version(request).await
1215    }
1216
1217    pub async fn risectl_pause_hummock_version_checkpoint(
1218        &self,
1219    ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1220        let request = RiseCtlPauseVersionCheckpointRequest {};
1221        self.inner.rise_ctl_pause_version_checkpoint(request).await
1222    }
1223
1224    pub async fn risectl_resume_hummock_version_checkpoint(
1225        &self,
1226    ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1227        let request = RiseCtlResumeVersionCheckpointRequest {};
1228        self.inner.rise_ctl_resume_version_checkpoint(request).await
1229    }
1230
1231    pub async fn init_metadata_for_replay(
1232        &self,
1233        tables: Vec<PbTable>,
1234        compaction_groups: Vec<CompactionGroupInfo>,
1235    ) -> Result<()> {
1236        let req = InitMetadataForReplayRequest {
1237            tables,
1238            compaction_groups,
1239        };
1240        let _resp = self.inner.init_metadata_for_replay(req).await?;
1241        Ok(())
1242    }
1243
1244    pub async fn replay_version_delta(
1245        &self,
1246        version_delta: HummockVersionDelta,
1247    ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1248        let req = ReplayVersionDeltaRequest {
1249            version_delta: Some(version_delta.into()),
1250        };
1251        let resp = self.inner.replay_version_delta(req).await?;
1252        Ok((
1253            HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1254            resp.modified_compaction_groups,
1255        ))
1256    }
1257
1258    pub async fn list_version_deltas(
1259        &self,
1260        start_id: HummockVersionId,
1261        num_limit: u32,
1262        committed_epoch_limit: HummockEpoch,
1263    ) -> Result<Vec<HummockVersionDelta>> {
1264        let req = ListVersionDeltasRequest {
1265            start_id: start_id.to_u64(),
1266            num_limit,
1267            committed_epoch_limit,
1268        };
1269        Ok(self
1270            .inner
1271            .list_version_deltas(req)
1272            .await?
1273            .version_deltas
1274            .unwrap()
1275            .version_deltas
1276            .iter()
1277            .map(HummockVersionDelta::from_rpc_protobuf)
1278            .collect())
1279    }
1280
1281    pub async fn trigger_compaction_deterministic(
1282        &self,
1283        version_id: HummockVersionId,
1284        compaction_groups: Vec<CompactionGroupId>,
1285    ) -> Result<()> {
1286        let req = TriggerCompactionDeterministicRequest {
1287            version_id: version_id.to_u64(),
1288            compaction_groups,
1289        };
1290        self.inner.trigger_compaction_deterministic(req).await?;
1291        Ok(())
1292    }
1293
1294    pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1295        let req = DisableCommitEpochRequest {};
1296        Ok(HummockVersion::from_rpc_protobuf(
1297            &self
1298                .inner
1299                .disable_commit_epoch(req)
1300                .await?
1301                .current_version
1302                .unwrap(),
1303        ))
1304    }
1305
1306    pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1307        let req = GetAssignedCompactTaskNumRequest {};
1308        let resp = self.inner.get_assigned_compact_task_num(req).await?;
1309        Ok(resp.num_tasks as usize)
1310    }
1311
1312    pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1313        let req = RiseCtlListCompactionGroupRequest {};
1314        let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1315        Ok(resp.compaction_groups)
1316    }
1317
1318    pub async fn risectl_update_compaction_config(
1319        &self,
1320        compaction_groups: &[CompactionGroupId],
1321        configs: &[MutableConfig],
1322    ) -> Result<()> {
1323        let req = RiseCtlUpdateCompactionConfigRequest {
1324            compaction_group_ids: compaction_groups.to_vec(),
1325            configs: configs
1326                .iter()
1327                .map(
1328                    |c| rise_ctl_update_compaction_config_request::MutableConfig {
1329                        mutable_config: Some(c.clone()),
1330                    },
1331                )
1332                .collect(),
1333        };
1334        let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1335        Ok(())
1336    }
1337
1338    pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1339        let req = BackupMetaRequest { remarks };
1340        let resp = self.inner.backup_meta(req).await?;
1341        Ok(resp.job_id)
1342    }
1343
1344    pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1345        let req = GetBackupJobStatusRequest { job_id };
1346        let resp = self.inner.get_backup_job_status(req).await?;
1347        Ok((resp.job_status(), resp.message))
1348    }
1349
1350    pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1351        let req = DeleteMetaSnapshotRequest {
1352            snapshot_ids: snapshot_ids.to_vec(),
1353        };
1354        let _resp = self.inner.delete_meta_snapshot(req).await?;
1355        Ok(())
1356    }
1357
1358    pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1359        let req = GetMetaSnapshotManifestRequest {};
1360        let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1361        Ok(resp.manifest.expect("should exist"))
1362    }
1363
1364    pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1365        let req = GetTelemetryInfoRequest {};
1366        let resp = self.inner.get_telemetry_info(req).await?;
1367        Ok(resp)
1368    }
1369
1370    pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1371        let req = GetMetaStoreInfoRequest {};
1372        let resp = self.inner.get_meta_store_info(req).await?;
1373        Ok(resp.meta_store_endpoint)
1374    }
1375
1376    pub async fn alter_sink_props(
1377        &self,
1378        sink_id: u32,
1379        changed_props: BTreeMap<String, String>,
1380        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1381        connector_conn_ref: Option<u32>,
1382    ) -> Result<()> {
1383        let req = AlterConnectorPropsRequest {
1384            object_id: sink_id,
1385            changed_props: changed_props.into_iter().collect(),
1386            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1387            connector_conn_ref,
1388            object_type: AlterConnectorPropsObject::Sink as i32,
1389        };
1390        let _resp = self.inner.alter_connector_props(req).await?;
1391        Ok(())
1392    }
1393
1394    pub async fn alter_source_connector_props(
1395        &self,
1396        source_id: u32,
1397        changed_props: BTreeMap<String, String>,
1398        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1399        connector_conn_ref: Option<u32>,
1400    ) -> Result<()> {
1401        let req = AlterConnectorPropsRequest {
1402            object_id: source_id,
1403            changed_props: changed_props.into_iter().collect(),
1404            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1405            connector_conn_ref,
1406            object_type: AlterConnectorPropsObject::Source as i32,
1407        };
1408        let _resp = self.inner.alter_connector_props(req).await?;
1409        Ok(())
1410    }
1411
1412    pub async fn set_system_param(
1413        &self,
1414        param: String,
1415        value: Option<String>,
1416    ) -> Result<Option<SystemParamsReader>> {
1417        let req = SetSystemParamRequest { param, value };
1418        let resp = self.inner.set_system_param(req).await?;
1419        Ok(resp.params.map(SystemParamsReader::from))
1420    }
1421
1422    pub async fn get_session_params(&self) -> Result<String> {
1423        let req = GetSessionParamsRequest {};
1424        let resp = self.inner.get_session_params(req).await?;
1425        Ok(resp.params)
1426    }
1427
1428    pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1429        let req = SetSessionParamRequest { param, value };
1430        let resp = self.inner.set_session_param(req).await?;
1431        Ok(resp.param)
1432    }
1433
1434    pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1435        let req = GetDdlProgressRequest {};
1436        let resp = self.inner.get_ddl_progress(req).await?;
1437        Ok(resp.ddl_progress)
1438    }
1439
1440    pub async fn split_compaction_group(
1441        &self,
1442        group_id: CompactionGroupId,
1443        table_ids_to_new_group: &[StateTableId],
1444        partition_vnode_count: u32,
1445    ) -> Result<CompactionGroupId> {
1446        let req = SplitCompactionGroupRequest {
1447            group_id,
1448            table_ids: table_ids_to_new_group.to_vec(),
1449            partition_vnode_count,
1450        };
1451        let resp = self.inner.split_compaction_group(req).await?;
1452        Ok(resp.new_group_id)
1453    }
1454
1455    pub async fn get_tables(
1456        &self,
1457        table_ids: &[u32],
1458        include_dropped_tables: bool,
1459    ) -> Result<HashMap<u32, Table>> {
1460        let req = GetTablesRequest {
1461            table_ids: table_ids.to_vec(),
1462            include_dropped_tables,
1463        };
1464        let resp = self.inner.get_tables(req).await?;
1465        Ok(resp.tables)
1466    }
1467
1468    pub async fn list_serving_vnode_mappings(
1469        &self,
1470    ) -> Result<HashMap<u32, (u32, WorkerSlotMapping)>> {
1471        let req = GetServingVnodeMappingsRequest {};
1472        let resp = self.inner.get_serving_vnode_mappings(req).await?;
1473        let mappings = resp
1474            .worker_slot_mappings
1475            .into_iter()
1476            .map(|p| {
1477                (
1478                    p.fragment_id,
1479                    (
1480                        resp.fragment_to_table
1481                            .get(&p.fragment_id)
1482                            .cloned()
1483                            .unwrap_or(0),
1484                        WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1485                    ),
1486                )
1487            })
1488            .collect();
1489        Ok(mappings)
1490    }
1491
1492    pub async fn risectl_list_compaction_status(
1493        &self,
1494    ) -> Result<(
1495        Vec<CompactStatus>,
1496        Vec<CompactTaskAssignment>,
1497        Vec<CompactTaskProgress>,
1498    )> {
1499        let req = RiseCtlListCompactionStatusRequest {};
1500        let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1501        Ok((
1502            resp.compaction_statuses,
1503            resp.task_assignment,
1504            resp.task_progress,
1505        ))
1506    }
1507
1508    pub async fn get_compaction_score(
1509        &self,
1510        compaction_group_id: CompactionGroupId,
1511    ) -> Result<Vec<PickerInfo>> {
1512        let req = GetCompactionScoreRequest {
1513            compaction_group_id,
1514        };
1515        let resp = self.inner.get_compaction_score(req).await?;
1516        Ok(resp.scores)
1517    }
1518
1519    pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1520        let req = RiseCtlRebuildTableStatsRequest {};
1521        let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1522        Ok(())
1523    }
1524
1525    pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1526        let req = ListBranchedObjectRequest {};
1527        let resp = self.inner.list_branched_object(req).await?;
1528        Ok(resp.branched_objects)
1529    }
1530
1531    pub async fn list_active_write_limit(&self) -> Result<HashMap<u64, WriteLimit>> {
1532        let req = ListActiveWriteLimitRequest {};
1533        let resp = self.inner.list_active_write_limit(req).await?;
1534        Ok(resp.write_limits)
1535    }
1536
1537    pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1538        let req = ListHummockMetaConfigRequest {};
1539        let resp = self.inner.list_hummock_meta_config(req).await?;
1540        Ok(resp.configs)
1541    }
1542
1543    pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1544        let _resp = self
1545            .inner
1546            .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1547            .await?;
1548
1549        Ok(())
1550    }
1551
1552    pub async fn rw_cloud_validate_source(
1553        &self,
1554        source_type: SourceType,
1555        source_config: HashMap<String, String>,
1556    ) -> Result<RwCloudValidateSourceResponse> {
1557        let req = RwCloudValidateSourceRequest {
1558            source_type: source_type.into(),
1559            source_config,
1560        };
1561        let resp = self.inner.rw_cloud_validate_source(req).await?;
1562        Ok(resp)
1563    }
1564
1565    pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1566        self.inner.core.read().await.sink_coordinate_client.clone()
1567    }
1568
1569    pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1570        let req = ListCompactTaskAssignmentRequest {};
1571        let resp = self.inner.list_compact_task_assignment(req).await?;
1572        Ok(resp.task_assignment)
1573    }
1574
1575    pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1576        let req = ListEventLogRequest::default();
1577        let resp = self.inner.list_event_log(req).await?;
1578        Ok(resp.event_logs)
1579    }
1580
1581    pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1582        let req = ListCompactTaskProgressRequest {};
1583        let resp = self.inner.list_compact_task_progress(req).await?;
1584        Ok(resp.task_progress)
1585    }
1586
1587    #[cfg(madsim)]
1588    pub fn try_add_panic_event_blocking(
1589        &self,
1590        panic_info: impl Display,
1591        timeout_millis: Option<u64>,
1592    ) {
1593    }
1594
1595    /// If `timeout_millis` is None, default is used.
1596    #[cfg(not(madsim))]
1597    pub fn try_add_panic_event_blocking(
1598        &self,
1599        panic_info: impl Display,
1600        timeout_millis: Option<u64>,
1601    ) {
1602        let event = event_log::EventWorkerNodePanic {
1603            worker_id: self.worker_id,
1604            worker_type: self.worker_type.into(),
1605            host_addr: Some(self.host_addr.to_protobuf()),
1606            panic_info: format!("{panic_info}"),
1607        };
1608        let grpc_meta_client = self.inner.clone();
1609        let _ = thread::spawn(move || {
1610            let rt = tokio::runtime::Builder::new_current_thread()
1611                .enable_all()
1612                .build()
1613                .unwrap();
1614            let req = AddEventLogRequest {
1615                event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1616            };
1617            rt.block_on(async {
1618                let _ = tokio::time::timeout(
1619                    Duration::from_millis(timeout_millis.unwrap_or(1000)),
1620                    grpc_meta_client.add_event_log(req),
1621                )
1622                .await;
1623            });
1624        })
1625        .join();
1626    }
1627
1628    pub async fn add_sink_fail_evet(
1629        &self,
1630        sink_id: u32,
1631        sink_name: String,
1632        connector: String,
1633        error: String,
1634    ) -> Result<()> {
1635        let event = event_log::EventSinkFail {
1636            sink_id,
1637            sink_name,
1638            connector,
1639            error,
1640        };
1641        let req = AddEventLogRequest {
1642            event: Some(add_event_log_request::Event::SinkFail(event)),
1643        };
1644        self.inner.add_event_log(req).await?;
1645        Ok(())
1646    }
1647
1648    pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1649        let req = CancelCompactTaskRequest {
1650            task_id,
1651            task_status: task_status as _,
1652        };
1653        let resp = self.inner.cancel_compact_task(req).await?;
1654        Ok(resp.ret)
1655    }
1656
1657    pub async fn get_version_by_epoch(
1658        &self,
1659        epoch: HummockEpoch,
1660        table_id: u32,
1661    ) -> Result<PbHummockVersion> {
1662        let req = GetVersionByEpochRequest { epoch, table_id };
1663        let resp = self.inner.get_version_by_epoch(req).await?;
1664        Ok(resp.version.unwrap())
1665    }
1666
1667    pub async fn get_cluster_limits(
1668        &self,
1669    ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1670        let req = GetClusterLimitsRequest {};
1671        let resp = self.inner.get_cluster_limits(req).await?;
1672        Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1673    }
1674
1675    pub async fn merge_compaction_group(
1676        &self,
1677        left_group_id: CompactionGroupId,
1678        right_group_id: CompactionGroupId,
1679    ) -> Result<()> {
1680        let req = MergeCompactionGroupRequest {
1681            left_group_id,
1682            right_group_id,
1683        };
1684        self.inner.merge_compaction_group(req).await?;
1685        Ok(())
1686    }
1687
1688    /// List all rate limits for sources and backfills
1689    pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1690        let request = ListRateLimitsRequest {};
1691        let resp = self.inner.list_rate_limits(request).await?;
1692        Ok(resp.rate_limits)
1693    }
1694
1695    pub async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
1696        let request = ListIcebergTablesRequest {};
1697        let resp = self.inner.list_iceberg_tables(request).await?;
1698        Ok(resp.iceberg_tables)
1699    }
1700
1701    /// Get the await tree of all nodes in the cluster.
1702    pub async fn get_cluster_stack_trace(
1703        &self,
1704        actor_traces_format: ActorTracesFormat,
1705    ) -> Result<StackTraceResponse> {
1706        let request = StackTraceRequest {
1707            actor_traces_format: actor_traces_format as i32,
1708        };
1709        let resp = self.inner.stack_trace(request).await?;
1710        Ok(resp)
1711    }
1712
1713    pub async fn set_sync_log_store_aligned(&self, job_id: u32, aligned: bool) -> Result<()> {
1714        let request = SetSyncLogStoreAlignedRequest { job_id, aligned };
1715        self.inner.set_sync_log_store_aligned(request).await?;
1716        Ok(())
1717    }
1718}
1719
1720#[async_trait]
1721impl HummockMetaClient for MetaClient {
1722    async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
1723        let req = UnpinVersionBeforeRequest {
1724            context_id: self.worker_id(),
1725            unpin_version_before: unpin_version_before.to_u64(),
1726        };
1727        self.inner.unpin_version_before(req).await?;
1728        Ok(())
1729    }
1730
1731    async fn get_current_version(&self) -> Result<HummockVersion> {
1732        let req = GetCurrentVersionRequest::default();
1733        Ok(HummockVersion::from_rpc_protobuf(
1734            &self
1735                .inner
1736                .get_current_version(req)
1737                .await?
1738                .current_version
1739                .unwrap(),
1740        ))
1741    }
1742
1743    async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
1744        let resp = self
1745            .inner
1746            .get_new_object_ids(GetNewObjectIdsRequest { number })
1747            .await?;
1748        Ok(ObjectIdRange::new(resp.start_id, resp.end_id))
1749    }
1750
1751    async fn commit_epoch_with_change_log(
1752        &self,
1753        _epoch: HummockEpoch,
1754        _sync_result: SyncResult,
1755        _change_log_info: Option<HummockMetaClientChangeLogInfo>,
1756    ) -> Result<()> {
1757        panic!("Only meta service can commit_epoch in production.")
1758    }
1759
1760    async fn trigger_manual_compaction(
1761        &self,
1762        compaction_group_id: u64,
1763        table_id: u32,
1764        level: u32,
1765        sst_ids: Vec<u64>,
1766    ) -> Result<()> {
1767        // TODO: support key_range parameter
1768        let req = TriggerManualCompactionRequest {
1769            compaction_group_id,
1770            table_id,
1771            // if table_id not exist, manual_compaction will include all the sst
1772            // without check internal_table_id
1773            level,
1774            sst_ids,
1775            ..Default::default()
1776        };
1777
1778        self.inner.trigger_manual_compaction(req).await?;
1779        Ok(())
1780    }
1781
1782    async fn trigger_full_gc(
1783        &self,
1784        sst_retention_time_sec: u64,
1785        prefix: Option<String>,
1786    ) -> Result<()> {
1787        self.inner
1788            .trigger_full_gc(TriggerFullGcRequest {
1789                sst_retention_time_sec,
1790                prefix,
1791            })
1792            .await?;
1793        Ok(())
1794    }
1795
1796    async fn subscribe_compaction_event(
1797        &self,
1798    ) -> Result<(
1799        UnboundedSender<SubscribeCompactionEventRequest>,
1800        BoxStream<'static, CompactionEventItem>,
1801    )> {
1802        let (request_sender, request_receiver) =
1803            unbounded_channel::<SubscribeCompactionEventRequest>();
1804        request_sender
1805            .send(SubscribeCompactionEventRequest {
1806                event: Some(subscribe_compaction_event_request::Event::Register(
1807                    Register {
1808                        context_id: self.worker_id(),
1809                    },
1810                )),
1811                create_at: SystemTime::now()
1812                    .duration_since(SystemTime::UNIX_EPOCH)
1813                    .expect("Clock may have gone backwards")
1814                    .as_millis() as u64,
1815            })
1816            .context("Failed to subscribe compaction event")?;
1817
1818        let stream = self
1819            .inner
1820            .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
1821                request_receiver,
1822            )))
1823            .await?;
1824
1825        Ok((request_sender, Box::pin(stream)))
1826    }
1827
1828    async fn get_version_by_epoch(
1829        &self,
1830        epoch: HummockEpoch,
1831        table_id: u32,
1832    ) -> Result<PbHummockVersion> {
1833        self.get_version_by_epoch(epoch, table_id).await
1834    }
1835
1836    async fn subscribe_iceberg_compaction_event(
1837        &self,
1838    ) -> Result<(
1839        UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1840        BoxStream<'static, IcebergCompactionEventItem>,
1841    )> {
1842        let (request_sender, request_receiver) =
1843            unbounded_channel::<SubscribeIcebergCompactionEventRequest>();
1844        request_sender
1845            .send(SubscribeIcebergCompactionEventRequest {
1846                event: Some(subscribe_iceberg_compaction_event_request::Event::Register(
1847                    IcebergRegister {
1848                        context_id: self.worker_id(),
1849                    },
1850                )),
1851                create_at: SystemTime::now()
1852                    .duration_since(SystemTime::UNIX_EPOCH)
1853                    .expect("Clock may have gone backwards")
1854                    .as_millis() as u64,
1855            })
1856            .context("Failed to subscribe compaction event")?;
1857
1858        let stream = self
1859            .inner
1860            .subscribe_iceberg_compaction_event(Request::new(UnboundedReceiverStream::new(
1861                request_receiver,
1862            )))
1863            .await?;
1864
1865        Ok((request_sender, Box::pin(stream)))
1866    }
1867}
1868
1869#[async_trait]
1870impl TelemetryInfoFetcher for MetaClient {
1871    async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
1872        let resp = self
1873            .get_telemetry_info()
1874            .await
1875            .map_err(|e| e.to_report_string())?;
1876        let tracking_id = resp.get_tracking_id().ok();
1877        Ok(tracking_id.map(|id| id.to_owned()))
1878    }
1879}
1880
1881pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
1882
1883#[derive(Debug, Clone)]
1884struct GrpcMetaClientCore {
1885    cluster_client: ClusterServiceClient<Channel>,
1886    meta_member_client: MetaMemberServiceClient<Channel>,
1887    heartbeat_client: HeartbeatServiceClient<Channel>,
1888    ddl_client: DdlServiceClient<Channel>,
1889    hummock_client: HummockManagerServiceClient<Channel>,
1890    notification_client: NotificationServiceClient<Channel>,
1891    stream_client: StreamManagerServiceClient<Channel>,
1892    user_client: UserServiceClient<Channel>,
1893    scale_client: ScaleServiceClient<Channel>,
1894    backup_client: BackupServiceClient<Channel>,
1895    telemetry_client: TelemetryInfoServiceClient<Channel>,
1896    system_params_client: SystemParamsServiceClient<Channel>,
1897    session_params_client: SessionParamServiceClient<Channel>,
1898    serving_client: ServingServiceClient<Channel>,
1899    cloud_client: CloudServiceClient<Channel>,
1900    sink_coordinate_client: SinkCoordinationRpcClient,
1901    event_log_client: EventLogServiceClient<Channel>,
1902    cluster_limit_client: ClusterLimitServiceClient<Channel>,
1903    hosted_iceberg_catalog_service_client: HostedIcebergCatalogServiceClient<Channel>,
1904    monitor_client: MonitorServiceClient<Channel>,
1905}
1906
1907impl GrpcMetaClientCore {
1908    pub(crate) fn new(channel: Channel) -> Self {
1909        let cluster_client = ClusterServiceClient::new(channel.clone());
1910        let meta_member_client = MetaMemberClient::new(channel.clone());
1911        let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
1912        let ddl_client =
1913            DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1914        let hummock_client =
1915            HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1916        let notification_client =
1917            NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1918        let stream_client =
1919            StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1920        let user_client = UserServiceClient::new(channel.clone());
1921        let scale_client =
1922            ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1923        let backup_client = BackupServiceClient::new(channel.clone());
1924        let telemetry_client =
1925            TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1926        let system_params_client = SystemParamsServiceClient::new(channel.clone());
1927        let session_params_client = SessionParamServiceClient::new(channel.clone());
1928        let serving_client = ServingServiceClient::new(channel.clone());
1929        let cloud_client = CloudServiceClient::new(channel.clone());
1930        let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone());
1931        let event_log_client = EventLogServiceClient::new(channel.clone());
1932        let cluster_limit_client = ClusterLimitServiceClient::new(channel.clone());
1933        let hosted_iceberg_catalog_service_client =
1934            HostedIcebergCatalogServiceClient::new(channel.clone());
1935        let monitor_client = MonitorServiceClient::new(channel);
1936
1937        GrpcMetaClientCore {
1938            cluster_client,
1939            meta_member_client,
1940            heartbeat_client,
1941            ddl_client,
1942            hummock_client,
1943            notification_client,
1944            stream_client,
1945            user_client,
1946            scale_client,
1947            backup_client,
1948            telemetry_client,
1949            system_params_client,
1950            session_params_client,
1951            serving_client,
1952            cloud_client,
1953            sink_coordinate_client,
1954            event_log_client,
1955            cluster_limit_client,
1956            hosted_iceberg_catalog_service_client,
1957            monitor_client,
1958        }
1959    }
1960}
1961
1962/// Client to meta server. Cloning the instance is lightweight.
1963///
1964/// It is a wrapper of tonic client. See [`crate::meta_rpc_client_method_impl`].
1965#[derive(Debug, Clone)]
1966struct GrpcMetaClient {
1967    member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
1968    core: Arc<RwLock<GrpcMetaClientCore>>,
1969}
1970
1971type MetaMemberClient = MetaMemberServiceClient<Channel>;
1972
1973struct MetaMemberGroup {
1974    members: LruCache<http::Uri, Option<MetaMemberClient>>,
1975}
1976
1977struct MetaMemberManagement {
1978    core_ref: Arc<RwLock<GrpcMetaClientCore>>,
1979    members: Either<MetaMemberClient, MetaMemberGroup>,
1980    current_leader: http::Uri,
1981    meta_config: MetaConfig,
1982}
1983
1984impl MetaMemberManagement {
1985    const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
1986
1987    fn host_address_to_uri(addr: HostAddress) -> http::Uri {
1988        format!("http://{}:{}", addr.host, addr.port)
1989            .parse()
1990            .unwrap()
1991    }
1992
1993    async fn recreate_core(&self, channel: Channel) {
1994        let mut core = self.core_ref.write().await;
1995        *core = GrpcMetaClientCore::new(channel);
1996    }
1997
1998    async fn refresh_members(&mut self) -> Result<()> {
1999        let leader_addr = match self.members.as_mut() {
2000            Either::Left(client) => {
2001                let resp = client
2002                    .to_owned()
2003                    .members(MembersRequest {})
2004                    .await
2005                    .map_err(RpcError::from_meta_status)?;
2006                let resp = resp.into_inner();
2007                resp.members.into_iter().find(|member| member.is_leader)
2008            }
2009            Either::Right(member_group) => {
2010                let mut fetched_members = None;
2011
2012                for (addr, client) in &mut member_group.members {
2013                    let members: Result<_> = try {
2014                        let mut client = match client {
2015                            Some(cached_client) => cached_client.to_owned(),
2016                            None => {
2017                                let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
2018                                let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
2019                                    .await
2020                                    .context("failed to create client")?;
2021                                let new_client: MetaMemberClient =
2022                                    MetaMemberServiceClient::new(channel);
2023                                *client = Some(new_client.clone());
2024
2025                                new_client
2026                            }
2027                        };
2028
2029                        let resp = client
2030                            .members(MembersRequest {})
2031                            .await
2032                            .context("failed to fetch members")?;
2033
2034                        resp.into_inner().members
2035                    };
2036
2037                    let fetched = members.is_ok();
2038                    fetched_members = Some(members);
2039                    if fetched {
2040                        break;
2041                    }
2042                }
2043
2044                let members = fetched_members
2045                    .context("no member available in the list")?
2046                    .context("could not refresh members")?;
2047
2048                // find new leader
2049                let mut leader = None;
2050                for member in members {
2051                    if member.is_leader {
2052                        leader = Some(member.clone());
2053                    }
2054
2055                    let addr = Self::host_address_to_uri(member.address.unwrap());
2056                    // We don't clean any expired addrs here to deal with some extreme situations.
2057                    if !member_group.members.contains(&addr) {
2058                        tracing::info!("new meta member joined: {}", addr);
2059                        member_group.members.put(addr, None);
2060                    }
2061                }
2062
2063                leader
2064            }
2065        };
2066
2067        if let Some(leader) = leader_addr {
2068            let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
2069
2070            if discovered_leader != self.current_leader {
2071                tracing::info!("new meta leader {} discovered", discovered_leader);
2072
2073                let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
2074                    Duration::from_secs(self.meta_config.meta_leader_lease_secs),
2075                    false,
2076                );
2077
2078                let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
2079                    let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
2080                    GrpcMetaClient::connect_to_endpoint(endpoint).await
2081                })
2082                .await?;
2083
2084                self.recreate_core(channel).await;
2085                self.current_leader = discovered_leader;
2086            }
2087        }
2088
2089        Ok(())
2090    }
2091}
2092
2093impl GrpcMetaClient {
2094    // See `Endpoint::http2_keep_alive_interval`
2095    const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
2096    // See `Endpoint::keep_alive_timeout`
2097    const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
2098    // Retry base interval in ms for connecting to meta server.
2099    const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
2100    // Max retry times for connecting to meta server.
2101    const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
2102
2103    fn start_meta_member_monitor(
2104        &self,
2105        init_leader_addr: http::Uri,
2106        members: Either<MetaMemberClient, MetaMemberGroup>,
2107        force_refresh_receiver: Receiver<Sender<Result<()>>>,
2108        meta_config: MetaConfig,
2109    ) -> Result<()> {
2110        let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
2111        let current_leader = init_leader_addr;
2112
2113        let enable_period_tick = matches!(members, Either::Right(_));
2114
2115        let member_management = MetaMemberManagement {
2116            core_ref,
2117            members,
2118            current_leader,
2119            meta_config,
2120        };
2121
2122        let mut force_refresh_receiver = force_refresh_receiver;
2123
2124        tokio::spawn(async move {
2125            let mut member_management = member_management;
2126            let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
2127
2128            loop {
2129                let event: Option<Sender<Result<()>>> = if enable_period_tick {
2130                    tokio::select! {
2131                        _ = ticker.tick() => None,
2132                        result_sender = force_refresh_receiver.recv() => {
2133                            if result_sender.is_none() {
2134                                break;
2135                            }
2136
2137                            result_sender
2138                        },
2139                    }
2140                } else {
2141                    let result_sender = force_refresh_receiver.recv().await;
2142
2143                    if result_sender.is_none() {
2144                        break;
2145                    }
2146
2147                    result_sender
2148                };
2149
2150                let tick_result = member_management.refresh_members().await;
2151                if let Err(e) = tick_result.as_ref() {
2152                    tracing::warn!(error = %e.as_report(),  "refresh meta member client failed");
2153                }
2154
2155                if let Some(sender) = event {
2156                    // ignore resp
2157                    let _resp = sender.send(tick_result);
2158                }
2159            }
2160        });
2161
2162        Ok(())
2163    }
2164
2165    async fn force_refresh_leader(&self) -> Result<()> {
2166        let (sender, receiver) = oneshot::channel();
2167
2168        self.member_monitor_event_sender
2169            .send(sender)
2170            .await
2171            .map_err(|e| anyhow!(e))?;
2172
2173        receiver.await.map_err(|e| anyhow!(e))?
2174    }
2175
2176    /// Connect to the meta server from `addrs`.
2177    pub async fn new(strategy: &MetaAddressStrategy, config: MetaConfig) -> Result<Self> {
2178        let (channel, addr) = match strategy {
2179            MetaAddressStrategy::LoadBalance(addr) => {
2180                Self::try_build_rpc_channel(vec![addr.clone()]).await
2181            }
2182            MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
2183        }?;
2184        let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
2185        let client = GrpcMetaClient {
2186            member_monitor_event_sender: force_refresh_sender,
2187            core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
2188        };
2189
2190        let meta_member_client = client.core.read().await.meta_member_client.clone();
2191        let members = match strategy {
2192            MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
2193            MetaAddressStrategy::List(addrs) => {
2194                let mut members = LruCache::new(20);
2195                for addr in addrs {
2196                    members.put(addr.clone(), None);
2197                }
2198                members.put(addr.clone(), Some(meta_member_client));
2199
2200                Either::Right(MetaMemberGroup { members })
2201            }
2202        };
2203
2204        client.start_meta_member_monitor(addr, members, force_refresh_receiver, config)?;
2205
2206        client.force_refresh_leader().await?;
2207
2208        Ok(client)
2209    }
2210
2211    fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2212        Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2213    }
2214
2215    pub(crate) async fn try_build_rpc_channel(
2216        addrs: impl IntoIterator<Item = http::Uri>,
2217    ) -> Result<(Channel, http::Uri)> {
2218        let endpoints: Vec<_> = addrs
2219            .into_iter()
2220            .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2221            .collect();
2222
2223        let mut last_error = None;
2224
2225        for (endpoint, addr) in endpoints {
2226            match Self::connect_to_endpoint(endpoint).await {
2227                Ok(channel) => {
2228                    tracing::info!("Connect to meta server {} successfully", addr);
2229                    return Ok((channel, addr));
2230                }
2231                Err(e) => {
2232                    tracing::warn!(
2233                        error = %e.as_report(),
2234                        "Failed to connect to meta server {}, trying again",
2235                        addr,
2236                    );
2237                    last_error = Some(e);
2238                }
2239            }
2240        }
2241
2242        if let Some(last_error) = last_error {
2243            Err(anyhow::anyhow!(last_error)
2244                .context("failed to connect to all meta servers")
2245                .into())
2246        } else {
2247            bail!("no meta server address provided")
2248        }
2249    }
2250
2251    async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2252        let channel = endpoint
2253            .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2254            .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2255            .connect_timeout(Duration::from_secs(5))
2256            .monitored_connect("grpc-meta-client", Default::default())
2257            .await?
2258            .wrapped();
2259
2260        Ok(channel)
2261    }
2262
2263    pub(crate) fn retry_strategy_to_bound(
2264        high_bound: Duration,
2265        exceed: bool,
2266    ) -> impl Iterator<Item = Duration> {
2267        let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2268            .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2269            .map(jitter);
2270
2271        let mut sum = Duration::default();
2272
2273        iter.take_while(move |duration| {
2274            sum += *duration;
2275
2276            if exceed {
2277                sum < high_bound + *duration
2278            } else {
2279                sum < high_bound
2280            }
2281        })
2282    }
2283}
2284
2285macro_rules! for_all_meta_rpc {
2286    ($macro:ident) => {
2287        $macro! {
2288             { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2289            ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2290            ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2291            ,{ cluster_client, update_worker_node_schedulability, UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse }
2292            ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2293            ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2294            ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2295            ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2296            ,{ stream_client, flush, FlushRequest, FlushResponse }
2297            ,{ stream_client, pause, PauseRequest, PauseResponse }
2298            ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2299            ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2300            ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2301            ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2302            ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2303            ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2304            ,{ stream_client, list_creating_fragment_distribution, ListCreatingFragmentDistributionRequest, ListCreatingFragmentDistributionResponse }
2305            ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2306            ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2307            ,{ stream_client, list_object_dependencies, ListObjectDependenciesRequest, ListObjectDependenciesResponse }
2308            ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2309            ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2310            ,{ stream_client, alter_connector_props, AlterConnectorPropsRequest, AlterConnectorPropsResponse }
2311            ,{ stream_client, get_fragment_by_id, GetFragmentByIdRequest, GetFragmentByIdResponse }
2312            ,{ stream_client, set_sync_log_store_aligned, SetSyncLogStoreAlignedRequest, SetSyncLogStoreAlignedResponse }
2313            ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2314            ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2315            ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2316            ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2317            ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2318            ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2319            ,{ ddl_client, alter_database_param, AlterDatabaseParamRequest, AlterDatabaseParamResponse }
2320            ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2321            ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2322            ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2323            ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2324            ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2325            ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2326            ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2327            ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2328            ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2329            ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2330            ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2331            ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2332            ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2333            ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2334            ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2335            ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2336            ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2337            ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2338            ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2339            ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2340            ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2341            ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2342            ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2343            ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2344            ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2345            ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2346            ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2347            ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2348            ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2349            ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2350            ,{ ddl_client, wait, WaitRequest, WaitResponse }
2351            ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2352            ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2353            ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2354            ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2355            ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2356            ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2357            ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2358            ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2359            ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2360            ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2361            ,{ hummock_client, get_new_object_ids, GetNewObjectIdsRequest, GetNewObjectIdsResponse }
2362            ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2363            ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2364            ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2365            ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2366            ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2367            ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2368            ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2369            ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2370            ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2371            ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2372            ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2373            ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2374            ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2375            ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2376            ,{ hummock_client, subscribe_iceberg_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeIcebergCompactionEventRequest>, Streaming<SubscribeIcebergCompactionEventResponse> }
2377            ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2378            ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2379            ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2380            ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2381            ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2382            ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2383            ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2384            ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2385            ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2386            ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2387            ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2388            ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2389            ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2390            ,{ user_client, alter_default_privilege, AlterDefaultPrivilegeRequest, AlterDefaultPrivilegeResponse }
2391            ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2392            ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2393            ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2394            ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2395            ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2396            ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2397            ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2398            ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2399            ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2400            ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2401            ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2402            ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2403            ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2404            ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2405            ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2406            ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2407            ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2408            ,{ hosted_iceberg_catalog_service_client, list_iceberg_tables, ListIcebergTablesRequest, ListIcebergTablesResponse }
2409            ,{ monitor_client, stack_trace, StackTraceRequest, StackTraceResponse }
2410        }
2411    };
2412}
2413
2414impl GrpcMetaClient {
2415    async fn refresh_client_if_needed(&self, code: Code) {
2416        if matches!(
2417            code,
2418            Code::Unknown | Code::Unimplemented | Code::Unavailable
2419        ) {
2420            tracing::debug!("matching tonic code {}", code);
2421            let (result_sender, result_receiver) = oneshot::channel();
2422            if self
2423                .member_monitor_event_sender
2424                .try_send(result_sender)
2425                .is_ok()
2426            {
2427                if let Ok(Err(e)) = result_receiver.await {
2428                    tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2429                }
2430            } else {
2431                tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2432            }
2433        }
2434    }
2435}
2436
2437impl GrpcMetaClient {
2438    for_all_meta_rpc! { meta_rpc_client_method_impl }
2439}