risingwave_rpc_client/
meta_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{FunctionId, IndexId, ObjectId, SecretId, TableId};
33use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
34use risingwave_common::hash::WorkerSlotMapping;
35use risingwave_common::monitor::EndpointExt;
36use risingwave_common::system_param::reader::SystemParamsReader;
37use risingwave_common::telemetry::report::TelemetryInfoFetcher;
38use risingwave_common::util::addr::HostAddr;
39use risingwave_common::util::column_index_mapping::ColIndexMapping;
40use risingwave_common::util::meta_addr::MetaAddressStrategy;
41use risingwave_common::util::resource_util::cpu::total_cpu_available;
42use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
43use risingwave_error::bail;
44use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
45use risingwave_hummock_sdk::compaction_group::StateTableId;
46use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
47use risingwave_hummock_sdk::{
48    CompactionGroupId, HummockEpoch, HummockVersionId, ObjectIdRange, SyncResult,
49};
50use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
51use risingwave_pb::backup_service::*;
52use risingwave_pb::catalog::{
53    Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
54    PbSubscription, PbTable, PbView, Table,
55};
56use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
57use risingwave_pb::cloud_service::*;
58use risingwave_pb::common::worker_node::Property;
59use risingwave_pb::common::{HostAddress, WorkerNode, WorkerType};
60use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
61use risingwave_pb::ddl_service::alter_owner_request::Object;
62use risingwave_pb::ddl_service::create_materialized_view_request::PbBackfillType;
63use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
64use risingwave_pb::ddl_service::drop_table_request::SourceId;
65use risingwave_pb::ddl_service::*;
66use risingwave_pb::hummock::compact_task::TaskStatus;
67use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
68use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
69use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
70use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
71use risingwave_pb::hummock::write_limits::WriteLimit;
72use risingwave_pb::hummock::*;
73use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Register as IcebergRegister;
74use risingwave_pb::iceberg_compaction::{
75    SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
76    subscribe_iceberg_compaction_event_request,
77};
78use risingwave_pb::meta::alter_connector_props_request::AlterConnectorPropsObject;
79use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
80use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
81use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
82use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
83use risingwave_pb::meta::hosted_iceberg_catalog_service_client::HostedIcebergCatalogServiceClient;
84use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
85use risingwave_pb::meta::list_actor_states_response::ActorState;
86use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
87use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
88use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
89use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
90use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
91use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
92use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
93use risingwave_pb::meta::serving_service_client::ServingServiceClient;
94use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
95use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
96use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
97use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
98use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
99use risingwave_pb::meta::{FragmentDistribution, *};
100use risingwave_pb::secret::PbSecretRef;
101use risingwave_pb::stream_plan::StreamFragmentGraph;
102use risingwave_pb::user::update_user_request::UpdateField;
103use risingwave_pb::user::user_service_client::UserServiceClient;
104use risingwave_pb::user::*;
105use thiserror_ext::AsReport;
106use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
107use tokio::sync::oneshot::Sender;
108use tokio::sync::{RwLock, mpsc, oneshot};
109use tokio::task::JoinHandle;
110use tokio::time::{self};
111use tokio_retry::strategy::{ExponentialBackoff, jitter};
112use tokio_stream::wrappers::UnboundedReceiverStream;
113use tonic::transport::Endpoint;
114use tonic::{Code, Request, Streaming};
115
116use crate::channel::{Channel, WrappedChannelExt};
117use crate::error::{Result, RpcError};
118use crate::hummock_meta_client::{
119    CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
120    IcebergCompactionEventItem,
121};
122use crate::meta_rpc_client_method_impl;
123
124type ConnectionId = u32;
125type DatabaseId = u32;
126type SchemaId = u32;
127
128/// Client to meta server. Cloning the instance is lightweight.
129#[derive(Clone, Debug)]
130pub struct MetaClient {
131    worker_id: u32,
132    worker_type: WorkerType,
133    host_addr: HostAddr,
134    inner: GrpcMetaClient,
135    meta_config: MetaConfig,
136    cluster_id: String,
137    shutting_down: Arc<AtomicBool>,
138}
139
140impl MetaClient {
141    pub fn worker_id(&self) -> u32 {
142        self.worker_id
143    }
144
145    pub fn host_addr(&self) -> &HostAddr {
146        &self.host_addr
147    }
148
149    pub fn worker_type(&self) -> WorkerType {
150        self.worker_type
151    }
152
153    pub fn cluster_id(&self) -> &str {
154        &self.cluster_id
155    }
156
157    /// Subscribe to notification from meta.
158    pub async fn subscribe(
159        &self,
160        subscribe_type: SubscribeType,
161    ) -> Result<Streaming<SubscribeResponse>> {
162        let request = SubscribeRequest {
163            subscribe_type: subscribe_type as i32,
164            host: Some(self.host_addr.to_protobuf()),
165            worker_id: self.worker_id(),
166        };
167
168        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
169            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
170            true,
171        );
172
173        tokio_retry::Retry::spawn(retry_strategy, || async {
174            let request = request.clone();
175            self.inner.subscribe(request).await
176        })
177        .await
178    }
179
180    pub async fn create_connection(
181        &self,
182        connection_name: String,
183        database_id: u32,
184        schema_id: u32,
185        owner_id: u32,
186        req: create_connection_request::Payload,
187    ) -> Result<WaitVersion> {
188        let request = CreateConnectionRequest {
189            name: connection_name,
190            database_id,
191            schema_id,
192            owner_id,
193            payload: Some(req),
194        };
195        let resp = self.inner.create_connection(request).await?;
196        Ok(resp
197            .version
198            .ok_or_else(|| anyhow!("wait version not set"))?)
199    }
200
201    pub async fn create_secret(
202        &self,
203        secret_name: String,
204        database_id: u32,
205        schema_id: u32,
206        owner_id: u32,
207        value: Vec<u8>,
208    ) -> Result<WaitVersion> {
209        let request = CreateSecretRequest {
210            name: secret_name,
211            database_id,
212            schema_id,
213            owner_id,
214            value,
215        };
216        let resp = self.inner.create_secret(request).await?;
217        Ok(resp
218            .version
219            .ok_or_else(|| anyhow!("wait version not set"))?)
220    }
221
222    pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
223        let request = ListConnectionsRequest {};
224        let resp = self.inner.list_connections(request).await?;
225        Ok(resp.connections)
226    }
227
228    pub async fn drop_connection(&self, connection_id: ConnectionId) -> Result<WaitVersion> {
229        let request = DropConnectionRequest { connection_id };
230        let resp = self.inner.drop_connection(request).await?;
231        Ok(resp
232            .version
233            .ok_or_else(|| anyhow!("wait version not set"))?)
234    }
235
236    pub async fn drop_secret(&self, secret_id: SecretId) -> Result<WaitVersion> {
237        let request = DropSecretRequest {
238            secret_id: secret_id.into(),
239        };
240        let resp = self.inner.drop_secret(request).await?;
241        Ok(resp
242            .version
243            .ok_or_else(|| anyhow!("wait version not set"))?)
244    }
245
246    /// Register the current node to the cluster and set the corresponding worker id.
247    ///
248    /// Retry if there's connection issue with the meta node. Exit the process if the registration fails.
249    pub async fn register_new(
250        addr_strategy: MetaAddressStrategy,
251        worker_type: WorkerType,
252        addr: &HostAddr,
253        property: Property,
254        meta_config: &MetaConfig,
255    ) -> (Self, SystemParamsReader) {
256        let ret =
257            Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
258
259        match ret {
260            Ok(ret) => ret,
261            Err(err) => {
262                tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
263                std::process::exit(1);
264            }
265        }
266    }
267
268    async fn register_new_inner(
269        addr_strategy: MetaAddressStrategy,
270        worker_type: WorkerType,
271        addr: &HostAddr,
272        property: Property,
273        meta_config: &MetaConfig,
274    ) -> Result<(Self, SystemParamsReader)> {
275        tracing::info!("register meta client using strategy: {}", addr_strategy);
276
277        // Retry until reaching `max_heartbeat_interval_secs`
278        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
279            Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
280            true,
281        );
282
283        if property.is_unschedulable {
284            tracing::warn!("worker {:?} registered as unschedulable", addr.clone());
285        }
286        let init_result: Result<_> = tokio_retry::RetryIf::spawn(
287            retry_strategy,
288            || async {
289                let grpc_meta_client =
290                    GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
291
292                let add_worker_resp = grpc_meta_client
293                    .add_worker_node(AddWorkerNodeRequest {
294                        worker_type: worker_type as i32,
295                        host: Some(addr.to_protobuf()),
296                        property: Some(property.clone()),
297                        resource: Some(risingwave_pb::common::worker_node::Resource {
298                            rw_version: RW_VERSION.to_owned(),
299                            total_memory_bytes: system_memory_available_bytes() as _,
300                            total_cpu_cores: total_cpu_available() as _,
301                        }),
302                    })
303                    .await
304                    .context("failed to add worker node")?;
305
306                let system_params_resp = grpc_meta_client
307                    .get_system_params(GetSystemParamsRequest {})
308                    .await
309                    .context("failed to get initial system params")?;
310
311                Ok((add_worker_resp, system_params_resp, grpc_meta_client))
312            },
313            // Only retry if there's any transient connection issue.
314            // If the error is from our implementation or business, do not retry it.
315            |e: &RpcError| !e.is_from_tonic_server_impl(),
316        )
317        .await;
318
319        let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
320        let worker_id = add_worker_resp
321            .node_id
322            .expect("AddWorkerNodeResponse::node_id is empty");
323
324        let meta_client = Self {
325            worker_id,
326            worker_type,
327            host_addr: addr.clone(),
328            inner: grpc_meta_client,
329            meta_config: meta_config.to_owned(),
330            cluster_id: add_worker_resp.cluster_id,
331            shutting_down: Arc::new(false.into()),
332        };
333
334        static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
335        REPORT_PANIC.call_once(|| {
336            let meta_client_clone = meta_client.clone();
337            std::panic::update_hook(move |default_hook, info| {
338                // Try to report panic event to meta node.
339                meta_client_clone.try_add_panic_event_blocking(info, None);
340                default_hook(info);
341            });
342        });
343
344        Ok((meta_client, system_params_resp.params.unwrap().into()))
345    }
346
347    /// Activate the current node in cluster to confirm it's ready to serve.
348    pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
349        let request = ActivateWorkerNodeRequest {
350            host: Some(addr.to_protobuf()),
351            node_id: self.worker_id,
352        };
353        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
354            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
355            true,
356        );
357        tokio_retry::Retry::spawn(retry_strategy, || async {
358            let request = request.clone();
359            self.inner.activate_worker_node(request).await
360        })
361        .await?;
362
363        Ok(())
364    }
365
366    /// Send heartbeat signal to meta service.
367    pub async fn send_heartbeat(&self, node_id: u32) -> Result<()> {
368        let request = HeartbeatRequest { node_id };
369        let resp = self.inner.heartbeat(request).await?;
370        if let Some(status) = resp.status {
371            if status.code() == risingwave_pb::common::status::Code::UnknownWorker {
372                // Ignore the error if we're already shutting down.
373                // Otherwise, exit the process.
374                if !self.shutting_down.load(Relaxed) {
375                    tracing::error!(message = status.message, "worker expired");
376                    std::process::exit(1);
377                }
378            }
379        }
380        Ok(())
381    }
382
383    pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
384        let request = CreateDatabaseRequest { db: Some(db) };
385        let resp = self.inner.create_database(request).await?;
386        // TODO: handle error in `resp.status` here
387        Ok(resp
388            .version
389            .ok_or_else(|| anyhow!("wait version not set"))?)
390    }
391
392    pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
393        let request = CreateSchemaRequest {
394            schema: Some(schema),
395        };
396        let resp = self.inner.create_schema(request).await?;
397        // TODO: handle error in `resp.status` here
398        Ok(resp
399            .version
400            .ok_or_else(|| anyhow!("wait version not set"))?)
401    }
402
403    pub async fn create_materialized_view(
404        &self,
405        table: PbTable,
406        graph: StreamFragmentGraph,
407        dependencies: HashSet<ObjectId>,
408        specific_resource_group: Option<String>,
409    ) -> Result<WaitVersion> {
410        let request = CreateMaterializedViewRequest {
411            materialized_view: Some(table),
412            fragment_graph: Some(graph),
413            backfill: PbBackfillType::Regular as _,
414            dependencies: dependencies.into_iter().collect(),
415            specific_resource_group,
416        };
417        let resp = self.inner.create_materialized_view(request).await?;
418        // TODO: handle error in `resp.status` here
419        Ok(resp
420            .version
421            .ok_or_else(|| anyhow!("wait version not set"))?)
422    }
423
424    pub async fn drop_materialized_view(
425        &self,
426        table_id: TableId,
427        cascade: bool,
428    ) -> Result<WaitVersion> {
429        let request = DropMaterializedViewRequest {
430            table_id: table_id.table_id(),
431            cascade,
432        };
433
434        let resp = self.inner.drop_materialized_view(request).await?;
435        Ok(resp
436            .version
437            .ok_or_else(|| anyhow!("wait version not set"))?)
438    }
439
440    pub async fn create_source(
441        &self,
442        source: PbSource,
443        graph: Option<StreamFragmentGraph>,
444    ) -> Result<WaitVersion> {
445        let request = CreateSourceRequest {
446            source: Some(source),
447            fragment_graph: graph,
448        };
449
450        let resp = self.inner.create_source(request).await?;
451        Ok(resp
452            .version
453            .ok_or_else(|| anyhow!("wait version not set"))?)
454    }
455
456    pub async fn create_sink(
457        &self,
458        sink: PbSink,
459        graph: StreamFragmentGraph,
460        affected_table_change: Option<ReplaceJobPlan>,
461        dependencies: HashSet<ObjectId>,
462    ) -> Result<WaitVersion> {
463        let request = CreateSinkRequest {
464            sink: Some(sink),
465            fragment_graph: Some(graph),
466            affected_table_change,
467            dependencies: dependencies.into_iter().collect(),
468        };
469
470        let resp = self.inner.create_sink(request).await?;
471        Ok(resp
472            .version
473            .ok_or_else(|| anyhow!("wait version not set"))?)
474    }
475
476    pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
477        let request = CreateSubscriptionRequest {
478            subscription: Some(subscription),
479        };
480
481        let resp = self.inner.create_subscription(request).await?;
482        Ok(resp
483            .version
484            .ok_or_else(|| anyhow!("wait version not set"))?)
485    }
486
487    pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
488        let request = CreateFunctionRequest {
489            function: Some(function),
490        };
491        let resp = self.inner.create_function(request).await?;
492        Ok(resp
493            .version
494            .ok_or_else(|| anyhow!("wait version not set"))?)
495    }
496
497    pub async fn create_table(
498        &self,
499        source: Option<PbSource>,
500        table: PbTable,
501        graph: StreamFragmentGraph,
502        job_type: PbTableJobType,
503    ) -> Result<WaitVersion> {
504        let request = CreateTableRequest {
505            materialized_view: Some(table),
506            fragment_graph: Some(graph),
507            source,
508            job_type: job_type as _,
509        };
510        let resp = self.inner.create_table(request).await?;
511        // TODO: handle error in `resp.status` here
512        Ok(resp
513            .version
514            .ok_or_else(|| anyhow!("wait version not set"))?)
515    }
516
517    pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
518        let request = CommentOnRequest {
519            comment: Some(comment),
520        };
521        let resp = self.inner.comment_on(request).await?;
522        Ok(resp
523            .version
524            .ok_or_else(|| anyhow!("wait version not set"))?)
525    }
526
527    pub async fn alter_name(
528        &self,
529        object: alter_name_request::Object,
530        name: &str,
531    ) -> Result<WaitVersion> {
532        let request = AlterNameRequest {
533            object: Some(object),
534            new_name: name.to_owned(),
535        };
536        let resp = self.inner.alter_name(request).await?;
537        Ok(resp
538            .version
539            .ok_or_else(|| anyhow!("wait version not set"))?)
540    }
541
542    pub async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<WaitVersion> {
543        let request = AlterOwnerRequest {
544            object: Some(object),
545            owner_id,
546        };
547        let resp = self.inner.alter_owner(request).await?;
548        Ok(resp
549            .version
550            .ok_or_else(|| anyhow!("wait version not set"))?)
551    }
552
553    pub async fn alter_set_schema(
554        &self,
555        object: alter_set_schema_request::Object,
556        new_schema_id: u32,
557    ) -> Result<WaitVersion> {
558        let request = AlterSetSchemaRequest {
559            new_schema_id,
560            object: Some(object),
561        };
562        let resp = self.inner.alter_set_schema(request).await?;
563        Ok(resp
564            .version
565            .ok_or_else(|| anyhow!("wait version not set"))?)
566    }
567
568    pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
569        let request = AlterSourceRequest {
570            source: Some(source),
571        };
572        let resp = self.inner.alter_source(request).await?;
573        Ok(resp
574            .version
575            .ok_or_else(|| anyhow!("wait version not set"))?)
576    }
577
578    pub async fn alter_parallelism(
579        &self,
580        table_id: u32,
581        parallelism: PbTableParallelism,
582        deferred: bool,
583    ) -> Result<()> {
584        let request = AlterParallelismRequest {
585            table_id,
586            parallelism: Some(parallelism),
587            deferred,
588        };
589
590        self.inner.alter_parallelism(request).await?;
591        Ok(())
592    }
593
594    pub async fn alter_resource_group(
595        &self,
596        table_id: u32,
597        resource_group: Option<String>,
598        deferred: bool,
599    ) -> Result<()> {
600        let request = AlterResourceGroupRequest {
601            table_id,
602            resource_group,
603            deferred,
604        };
605
606        self.inner.alter_resource_group(request).await?;
607        Ok(())
608    }
609
610    pub async fn alter_swap_rename(
611        &self,
612        object: alter_swap_rename_request::Object,
613    ) -> Result<WaitVersion> {
614        let request = AlterSwapRenameRequest {
615            object: Some(object),
616        };
617        let resp = self.inner.alter_swap_rename(request).await?;
618        Ok(resp
619            .version
620            .ok_or_else(|| anyhow!("wait version not set"))?)
621    }
622
623    pub async fn alter_secret(
624        &self,
625        secret_id: u32,
626        secret_name: String,
627        database_id: u32,
628        schema_id: u32,
629        owner_id: u32,
630        value: Vec<u8>,
631    ) -> Result<WaitVersion> {
632        let request = AlterSecretRequest {
633            secret_id,
634            name: secret_name,
635            database_id,
636            schema_id,
637            owner_id,
638            value,
639        };
640        let resp = self.inner.alter_secret(request).await?;
641        Ok(resp
642            .version
643            .ok_or_else(|| anyhow!("wait version not set"))?)
644    }
645
646    pub async fn replace_job(
647        &self,
648        graph: StreamFragmentGraph,
649        table_col_index_mapping: ColIndexMapping,
650        replace_job: ReplaceJob,
651    ) -> Result<WaitVersion> {
652        let request = ReplaceJobPlanRequest {
653            plan: Some(ReplaceJobPlan {
654                fragment_graph: Some(graph),
655                table_col_index_mapping: Some(table_col_index_mapping.to_protobuf()),
656                replace_job: Some(replace_job),
657            }),
658        };
659        let resp = self.inner.replace_job_plan(request).await?;
660        // TODO: handle error in `resp.status` here
661        Ok(resp
662            .version
663            .ok_or_else(|| anyhow!("wait version not set"))?)
664    }
665
666    pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
667        let request = AutoSchemaChangeRequest {
668            schema_change: Some(schema_change),
669        };
670        let _ = self.inner.auto_schema_change(request).await?;
671        Ok(())
672    }
673
674    pub async fn create_view(&self, view: PbView) -> Result<WaitVersion> {
675        let request = CreateViewRequest { view: Some(view) };
676        let resp = self.inner.create_view(request).await?;
677        // TODO: handle error in `resp.status` here
678        Ok(resp
679            .version
680            .ok_or_else(|| anyhow!("wait version not set"))?)
681    }
682
683    pub async fn create_index(
684        &self,
685        index: PbIndex,
686        table: PbTable,
687        graph: StreamFragmentGraph,
688    ) -> Result<WaitVersion> {
689        let request = CreateIndexRequest {
690            index: Some(index),
691            index_table: Some(table),
692            fragment_graph: Some(graph),
693        };
694        let resp = self.inner.create_index(request).await?;
695        // TODO: handle error in `resp.status` here
696        Ok(resp
697            .version
698            .ok_or_else(|| anyhow!("wait version not set"))?)
699    }
700
701    pub async fn drop_table(
702        &self,
703        source_id: Option<u32>,
704        table_id: TableId,
705        cascade: bool,
706    ) -> Result<WaitVersion> {
707        let request = DropTableRequest {
708            source_id: source_id.map(SourceId::Id),
709            table_id: table_id.table_id(),
710            cascade,
711        };
712
713        let resp = self.inner.drop_table(request).await?;
714        Ok(resp
715            .version
716            .ok_or_else(|| anyhow!("wait version not set"))?)
717    }
718
719    pub async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<WaitVersion> {
720        let request = DropViewRequest { view_id, cascade };
721        let resp = self.inner.drop_view(request).await?;
722        Ok(resp
723            .version
724            .ok_or_else(|| anyhow!("wait version not set"))?)
725    }
726
727    pub async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<WaitVersion> {
728        let request = DropSourceRequest { source_id, cascade };
729        let resp = self.inner.drop_source(request).await?;
730        Ok(resp
731            .version
732            .ok_or_else(|| anyhow!("wait version not set"))?)
733    }
734
735    pub async fn drop_sink(
736        &self,
737        sink_id: u32,
738        cascade: bool,
739        affected_table_change: Option<ReplaceJobPlan>,
740    ) -> Result<WaitVersion> {
741        let request = DropSinkRequest {
742            sink_id,
743            cascade,
744            affected_table_change,
745        };
746        let resp = self.inner.drop_sink(request).await?;
747        Ok(resp
748            .version
749            .ok_or_else(|| anyhow!("wait version not set"))?)
750    }
751
752    pub async fn drop_subscription(
753        &self,
754        subscription_id: u32,
755        cascade: bool,
756    ) -> Result<WaitVersion> {
757        let request = DropSubscriptionRequest {
758            subscription_id,
759            cascade,
760        };
761        let resp = self.inner.drop_subscription(request).await?;
762        Ok(resp
763            .version
764            .ok_or_else(|| anyhow!("wait version not set"))?)
765    }
766
767    pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
768        let request = DropIndexRequest {
769            index_id: index_id.index_id,
770            cascade,
771        };
772        let resp = self.inner.drop_index(request).await?;
773        Ok(resp
774            .version
775            .ok_or_else(|| anyhow!("wait version not set"))?)
776    }
777
778    pub async fn drop_function(&self, function_id: FunctionId) -> Result<WaitVersion> {
779        let request = DropFunctionRequest {
780            function_id: function_id.0,
781        };
782        let resp = self.inner.drop_function(request).await?;
783        Ok(resp
784            .version
785            .ok_or_else(|| anyhow!("wait version not set"))?)
786    }
787
788    pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
789        let request = DropDatabaseRequest { database_id };
790        let resp = self.inner.drop_database(request).await?;
791        Ok(resp
792            .version
793            .ok_or_else(|| anyhow!("wait version not set"))?)
794    }
795
796    pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
797        let request = DropSchemaRequest { schema_id, cascade };
798        let resp = self.inner.drop_schema(request).await?;
799        Ok(resp
800            .version
801            .ok_or_else(|| anyhow!("wait version not set"))?)
802    }
803
804    // TODO: using UserInfoVersion instead as return type.
805    pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
806        let request = CreateUserRequest { user: Some(user) };
807        let resp = self.inner.create_user(request).await?;
808        Ok(resp.version)
809    }
810
811    pub async fn drop_user(&self, user_id: u32) -> Result<u64> {
812        let request = DropUserRequest { user_id };
813        let resp = self.inner.drop_user(request).await?;
814        Ok(resp.version)
815    }
816
817    pub async fn update_user(
818        &self,
819        user: UserInfo,
820        update_fields: Vec<UpdateField>,
821    ) -> Result<u64> {
822        let request = UpdateUserRequest {
823            user: Some(user),
824            update_fields: update_fields
825                .into_iter()
826                .map(|field| field as i32)
827                .collect::<Vec<_>>(),
828        };
829        let resp = self.inner.update_user(request).await?;
830        Ok(resp.version)
831    }
832
833    pub async fn grant_privilege(
834        &self,
835        user_ids: Vec<u32>,
836        privileges: Vec<GrantPrivilege>,
837        with_grant_option: bool,
838        granted_by: u32,
839    ) -> Result<u64> {
840        let request = GrantPrivilegeRequest {
841            user_ids,
842            privileges,
843            with_grant_option,
844            granted_by,
845        };
846        let resp = self.inner.grant_privilege(request).await?;
847        Ok(resp.version)
848    }
849
850    pub async fn revoke_privilege(
851        &self,
852        user_ids: Vec<u32>,
853        privileges: Vec<GrantPrivilege>,
854        granted_by: u32,
855        revoke_by: u32,
856        revoke_grant_option: bool,
857        cascade: bool,
858    ) -> Result<u64> {
859        let request = RevokePrivilegeRequest {
860            user_ids,
861            privileges,
862            granted_by,
863            revoke_by,
864            revoke_grant_option,
865            cascade,
866        };
867        let resp = self.inner.revoke_privilege(request).await?;
868        Ok(resp.version)
869    }
870
871    /// Unregister the current node from the cluster.
872    pub async fn unregister(&self) -> Result<()> {
873        let request = DeleteWorkerNodeRequest {
874            host: Some(self.host_addr.to_protobuf()),
875        };
876        self.inner.delete_worker_node(request).await?;
877        self.shutting_down.store(true, Relaxed);
878        Ok(())
879    }
880
881    /// Try to unregister the current worker from the cluster with best effort. Log the result.
882    pub async fn try_unregister(&self) {
883        match self.unregister().await {
884            Ok(_) => {
885                tracing::info!(
886                    worker_id = self.worker_id(),
887                    "successfully unregistered from meta service",
888                )
889            }
890            Err(e) => {
891                tracing::warn!(
892                    error = %e.as_report(),
893                    worker_id = self.worker_id(),
894                    "failed to unregister from meta service",
895                );
896            }
897        }
898    }
899
900    pub async fn update_schedulability(
901        &self,
902        worker_ids: &[u32],
903        schedulability: Schedulability,
904    ) -> Result<UpdateWorkerNodeSchedulabilityResponse> {
905        let request = UpdateWorkerNodeSchedulabilityRequest {
906            worker_ids: worker_ids.to_vec(),
907            schedulability: schedulability.into(),
908        };
909        let resp = self
910            .inner
911            .update_worker_node_schedulability(request)
912            .await?;
913        Ok(resp)
914    }
915
916    pub async fn list_worker_nodes(
917        &self,
918        worker_type: Option<WorkerType>,
919    ) -> Result<Vec<WorkerNode>> {
920        let request = ListAllNodesRequest {
921            worker_type: worker_type.map(Into::into),
922            include_starting_nodes: true,
923        };
924        let resp = self.inner.list_all_nodes(request).await?;
925        Ok(resp.nodes)
926    }
927
928    /// Starts a heartbeat worker.
929    pub fn start_heartbeat_loop(
930        meta_client: MetaClient,
931        min_interval: Duration,
932    ) -> (JoinHandle<()>, Sender<()>) {
933        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
934        let join_handle = tokio::spawn(async move {
935            let mut min_interval_ticker = tokio::time::interval(min_interval);
936            loop {
937                tokio::select! {
938                    biased;
939                    // Shutdown
940                    _ = &mut shutdown_rx => {
941                        tracing::info!("Heartbeat loop is stopped");
942                        return;
943                    }
944                    // Wait for interval
945                    _ = min_interval_ticker.tick() => {},
946                }
947                tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
948                match tokio::time::timeout(
949                    // TODO: decide better min_interval for timeout
950                    min_interval * 3,
951                    meta_client.send_heartbeat(meta_client.worker_id()),
952                )
953                .await
954                {
955                    Ok(Ok(_)) => {}
956                    Ok(Err(err)) => {
957                        tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
958                    }
959                    Err(_) => {
960                        tracing::warn!("Failed to send_heartbeat: timeout");
961                    }
962                }
963            }
964        });
965        (join_handle, shutdown_tx)
966    }
967
968    pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
969        let request = RisectlListStateTablesRequest {};
970        let resp = self.inner.risectl_list_state_tables(request).await?;
971        Ok(resp.tables)
972    }
973
974    pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
975        let request = FlushRequest { database_id };
976        let resp = self.inner.flush(request).await?;
977        Ok(HummockVersionId::new(resp.hummock_version_id))
978    }
979
980    pub async fn wait(&self) -> Result<()> {
981        let request = WaitRequest {};
982        self.inner.wait(request).await?;
983        Ok(())
984    }
985
986    pub async fn recover(&self) -> Result<()> {
987        let request = RecoverRequest {};
988        self.inner.recover(request).await?;
989        Ok(())
990    }
991
992    pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
993        let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
994        let resp = self.inner.cancel_creating_jobs(request).await?;
995        Ok(resp.canceled_jobs)
996    }
997
998    pub async fn list_table_fragments(
999        &self,
1000        table_ids: &[u32],
1001    ) -> Result<HashMap<u32, TableFragmentInfo>> {
1002        let request = ListTableFragmentsRequest {
1003            table_ids: table_ids.to_vec(),
1004        };
1005        let resp = self.inner.list_table_fragments(request).await?;
1006        Ok(resp.table_fragments)
1007    }
1008
1009    pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1010        let resp = self
1011            .inner
1012            .list_streaming_job_states(ListStreamingJobStatesRequest {})
1013            .await?;
1014        Ok(resp.states)
1015    }
1016
1017    pub async fn list_fragment_distributions(&self) -> Result<Vec<FragmentDistribution>> {
1018        let resp = self
1019            .inner
1020            .list_fragment_distribution(ListFragmentDistributionRequest {})
1021            .await?;
1022        Ok(resp.distributions)
1023    }
1024
1025    pub async fn get_fragment_by_id(
1026        &self,
1027        fragment_id: u32,
1028    ) -> Result<Option<FragmentDistribution>> {
1029        let resp = self
1030            .inner
1031            .get_fragment_by_id(GetFragmentByIdRequest { fragment_id })
1032            .await?;
1033        Ok(resp.distribution)
1034    }
1035
1036    pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1037        let resp = self
1038            .inner
1039            .list_actor_states(ListActorStatesRequest {})
1040            .await?;
1041        Ok(resp.states)
1042    }
1043
1044    pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1045        let resp = self
1046            .inner
1047            .list_actor_splits(ListActorSplitsRequest {})
1048            .await?;
1049
1050        Ok(resp.actor_splits)
1051    }
1052
1053    pub async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
1054        let resp = self
1055            .inner
1056            .list_object_dependencies(ListObjectDependenciesRequest {})
1057            .await?;
1058        Ok(resp.dependencies)
1059    }
1060
1061    pub async fn pause(&self) -> Result<PauseResponse> {
1062        let request = PauseRequest {};
1063        let resp = self.inner.pause(request).await?;
1064        Ok(resp)
1065    }
1066
1067    pub async fn resume(&self) -> Result<ResumeResponse> {
1068        let request = ResumeRequest {};
1069        let resp = self.inner.resume(request).await?;
1070        Ok(resp)
1071    }
1072
1073    pub async fn apply_throttle(
1074        &self,
1075        kind: PbThrottleTarget,
1076        id: u32,
1077        rate: Option<u32>,
1078    ) -> Result<ApplyThrottleResponse> {
1079        let request = ApplyThrottleRequest {
1080            kind: kind as i32,
1081            id,
1082            rate,
1083        };
1084        let resp = self.inner.apply_throttle(request).await?;
1085        Ok(resp)
1086    }
1087
1088    pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1089        let resp = self
1090            .inner
1091            .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1092            .await?;
1093        Ok(resp.get_status().unwrap())
1094    }
1095
1096    pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1097        let request = GetClusterInfoRequest {};
1098        let resp = self.inner.get_cluster_info(request).await?;
1099        Ok(resp)
1100    }
1101
1102    pub async fn reschedule(
1103        &self,
1104        worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1105        revision: u64,
1106        resolve_no_shuffle_upstream: bool,
1107    ) -> Result<(bool, u64)> {
1108        let request = RescheduleRequest {
1109            revision,
1110            resolve_no_shuffle_upstream,
1111            worker_reschedules,
1112        };
1113        let resp = self.inner.reschedule(request).await?;
1114        Ok((resp.success, resp.revision))
1115    }
1116
1117    pub async fn risectl_get_pinned_versions_summary(
1118        &self,
1119    ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1120        let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1121        self.inner
1122            .rise_ctl_get_pinned_versions_summary(request)
1123            .await
1124    }
1125
1126    pub async fn risectl_get_checkpoint_hummock_version(
1127        &self,
1128    ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1129        let request = RiseCtlGetCheckpointVersionRequest {};
1130        self.inner.rise_ctl_get_checkpoint_version(request).await
1131    }
1132
1133    pub async fn risectl_pause_hummock_version_checkpoint(
1134        &self,
1135    ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1136        let request = RiseCtlPauseVersionCheckpointRequest {};
1137        self.inner.rise_ctl_pause_version_checkpoint(request).await
1138    }
1139
1140    pub async fn risectl_resume_hummock_version_checkpoint(
1141        &self,
1142    ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1143        let request = RiseCtlResumeVersionCheckpointRequest {};
1144        self.inner.rise_ctl_resume_version_checkpoint(request).await
1145    }
1146
1147    pub async fn init_metadata_for_replay(
1148        &self,
1149        tables: Vec<PbTable>,
1150        compaction_groups: Vec<CompactionGroupInfo>,
1151    ) -> Result<()> {
1152        let req = InitMetadataForReplayRequest {
1153            tables,
1154            compaction_groups,
1155        };
1156        let _resp = self.inner.init_metadata_for_replay(req).await?;
1157        Ok(())
1158    }
1159
1160    pub async fn replay_version_delta(
1161        &self,
1162        version_delta: HummockVersionDelta,
1163    ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1164        let req = ReplayVersionDeltaRequest {
1165            version_delta: Some(version_delta.into()),
1166        };
1167        let resp = self.inner.replay_version_delta(req).await?;
1168        Ok((
1169            HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1170            resp.modified_compaction_groups,
1171        ))
1172    }
1173
1174    pub async fn list_version_deltas(
1175        &self,
1176        start_id: HummockVersionId,
1177        num_limit: u32,
1178        committed_epoch_limit: HummockEpoch,
1179    ) -> Result<Vec<HummockVersionDelta>> {
1180        let req = ListVersionDeltasRequest {
1181            start_id: start_id.to_u64(),
1182            num_limit,
1183            committed_epoch_limit,
1184        };
1185        Ok(self
1186            .inner
1187            .list_version_deltas(req)
1188            .await?
1189            .version_deltas
1190            .unwrap()
1191            .version_deltas
1192            .iter()
1193            .map(HummockVersionDelta::from_rpc_protobuf)
1194            .collect())
1195    }
1196
1197    pub async fn trigger_compaction_deterministic(
1198        &self,
1199        version_id: HummockVersionId,
1200        compaction_groups: Vec<CompactionGroupId>,
1201    ) -> Result<()> {
1202        let req = TriggerCompactionDeterministicRequest {
1203            version_id: version_id.to_u64(),
1204            compaction_groups,
1205        };
1206        self.inner.trigger_compaction_deterministic(req).await?;
1207        Ok(())
1208    }
1209
1210    pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1211        let req = DisableCommitEpochRequest {};
1212        Ok(HummockVersion::from_rpc_protobuf(
1213            &self
1214                .inner
1215                .disable_commit_epoch(req)
1216                .await?
1217                .current_version
1218                .unwrap(),
1219        ))
1220    }
1221
1222    pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1223        let req = GetAssignedCompactTaskNumRequest {};
1224        let resp = self.inner.get_assigned_compact_task_num(req).await?;
1225        Ok(resp.num_tasks as usize)
1226    }
1227
1228    pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1229        let req = RiseCtlListCompactionGroupRequest {};
1230        let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1231        Ok(resp.compaction_groups)
1232    }
1233
1234    pub async fn risectl_update_compaction_config(
1235        &self,
1236        compaction_groups: &[CompactionGroupId],
1237        configs: &[MutableConfig],
1238    ) -> Result<()> {
1239        let req = RiseCtlUpdateCompactionConfigRequest {
1240            compaction_group_ids: compaction_groups.to_vec(),
1241            configs: configs
1242                .iter()
1243                .map(
1244                    |c| rise_ctl_update_compaction_config_request::MutableConfig {
1245                        mutable_config: Some(c.clone()),
1246                    },
1247                )
1248                .collect(),
1249        };
1250        let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1251        Ok(())
1252    }
1253
1254    pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1255        let req = BackupMetaRequest { remarks };
1256        let resp = self.inner.backup_meta(req).await?;
1257        Ok(resp.job_id)
1258    }
1259
1260    pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1261        let req = GetBackupJobStatusRequest { job_id };
1262        let resp = self.inner.get_backup_job_status(req).await?;
1263        Ok((resp.job_status(), resp.message))
1264    }
1265
1266    pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1267        let req = DeleteMetaSnapshotRequest {
1268            snapshot_ids: snapshot_ids.to_vec(),
1269        };
1270        let _resp = self.inner.delete_meta_snapshot(req).await?;
1271        Ok(())
1272    }
1273
1274    pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1275        let req = GetMetaSnapshotManifestRequest {};
1276        let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1277        Ok(resp.manifest.expect("should exist"))
1278    }
1279
1280    pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1281        let req = GetTelemetryInfoRequest {};
1282        let resp = self.inner.get_telemetry_info(req).await?;
1283        Ok(resp)
1284    }
1285
1286    pub async fn get_system_params(&self) -> Result<SystemParamsReader> {
1287        let req = GetSystemParamsRequest {};
1288        let resp = self.inner.get_system_params(req).await?;
1289        Ok(resp.params.unwrap().into())
1290    }
1291
1292    pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1293        let req = GetMetaStoreInfoRequest {};
1294        let resp = self.inner.get_meta_store_info(req).await?;
1295        Ok(resp.meta_store_endpoint)
1296    }
1297
1298    pub async fn alter_sink_props(
1299        &self,
1300        sink_id: u32,
1301        changed_props: BTreeMap<String, String>,
1302        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1303        connector_conn_ref: Option<u32>,
1304    ) -> Result<()> {
1305        let req = AlterConnectorPropsRequest {
1306            object_id: sink_id,
1307            changed_props: changed_props.into_iter().collect(),
1308            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1309            connector_conn_ref,
1310            object_type: AlterConnectorPropsObject::Sink as i32,
1311        };
1312        let _resp = self.inner.alter_connector_props(req).await?;
1313        Ok(())
1314    }
1315
1316    pub async fn set_system_param(
1317        &self,
1318        param: String,
1319        value: Option<String>,
1320    ) -> Result<Option<SystemParamsReader>> {
1321        let req = SetSystemParamRequest { param, value };
1322        let resp = self.inner.set_system_param(req).await?;
1323        Ok(resp.params.map(SystemParamsReader::from))
1324    }
1325
1326    pub async fn get_session_params(&self) -> Result<String> {
1327        let req = GetSessionParamsRequest {};
1328        let resp = self.inner.get_session_params(req).await?;
1329        Ok(resp.params)
1330    }
1331
1332    pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1333        let req = SetSessionParamRequest { param, value };
1334        let resp = self.inner.set_session_param(req).await?;
1335        Ok(resp.param)
1336    }
1337
1338    pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1339        let req = GetDdlProgressRequest {};
1340        let resp = self.inner.get_ddl_progress(req).await?;
1341        Ok(resp.ddl_progress)
1342    }
1343
1344    pub async fn split_compaction_group(
1345        &self,
1346        group_id: CompactionGroupId,
1347        table_ids_to_new_group: &[StateTableId],
1348        partition_vnode_count: u32,
1349    ) -> Result<CompactionGroupId> {
1350        let req = SplitCompactionGroupRequest {
1351            group_id,
1352            table_ids: table_ids_to_new_group.to_vec(),
1353            partition_vnode_count,
1354        };
1355        let resp = self.inner.split_compaction_group(req).await?;
1356        Ok(resp.new_group_id)
1357    }
1358
1359    pub async fn get_tables(
1360        &self,
1361        table_ids: &[u32],
1362        include_dropped_tables: bool,
1363    ) -> Result<HashMap<u32, Table>> {
1364        let req = GetTablesRequest {
1365            table_ids: table_ids.to_vec(),
1366            include_dropped_tables,
1367        };
1368        let resp = self.inner.get_tables(req).await?;
1369        Ok(resp.tables)
1370    }
1371
1372    pub async fn list_serving_vnode_mappings(
1373        &self,
1374    ) -> Result<HashMap<u32, (u32, WorkerSlotMapping)>> {
1375        let req = GetServingVnodeMappingsRequest {};
1376        let resp = self.inner.get_serving_vnode_mappings(req).await?;
1377        let mappings = resp
1378            .worker_slot_mappings
1379            .into_iter()
1380            .map(|p| {
1381                (
1382                    p.fragment_id,
1383                    (
1384                        resp.fragment_to_table
1385                            .get(&p.fragment_id)
1386                            .cloned()
1387                            .unwrap_or(0),
1388                        WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1389                    ),
1390                )
1391            })
1392            .collect();
1393        Ok(mappings)
1394    }
1395
1396    pub async fn risectl_list_compaction_status(
1397        &self,
1398    ) -> Result<(
1399        Vec<CompactStatus>,
1400        Vec<CompactTaskAssignment>,
1401        Vec<CompactTaskProgress>,
1402    )> {
1403        let req = RiseCtlListCompactionStatusRequest {};
1404        let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1405        Ok((
1406            resp.compaction_statuses,
1407            resp.task_assignment,
1408            resp.task_progress,
1409        ))
1410    }
1411
1412    pub async fn get_compaction_score(
1413        &self,
1414        compaction_group_id: CompactionGroupId,
1415    ) -> Result<Vec<PickerInfo>> {
1416        let req = GetCompactionScoreRequest {
1417            compaction_group_id,
1418        };
1419        let resp = self.inner.get_compaction_score(req).await?;
1420        Ok(resp.scores)
1421    }
1422
1423    pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1424        let req = RiseCtlRebuildTableStatsRequest {};
1425        let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1426        Ok(())
1427    }
1428
1429    pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1430        let req = ListBranchedObjectRequest {};
1431        let resp = self.inner.list_branched_object(req).await?;
1432        Ok(resp.branched_objects)
1433    }
1434
1435    pub async fn list_active_write_limit(&self) -> Result<HashMap<u64, WriteLimit>> {
1436        let req = ListActiveWriteLimitRequest {};
1437        let resp = self.inner.list_active_write_limit(req).await?;
1438        Ok(resp.write_limits)
1439    }
1440
1441    pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1442        let req = ListHummockMetaConfigRequest {};
1443        let resp = self.inner.list_hummock_meta_config(req).await?;
1444        Ok(resp.configs)
1445    }
1446
1447    pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1448        let _resp = self
1449            .inner
1450            .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1451            .await?;
1452
1453        Ok(())
1454    }
1455
1456    pub async fn rw_cloud_validate_source(
1457        &self,
1458        source_type: SourceType,
1459        source_config: HashMap<String, String>,
1460    ) -> Result<RwCloudValidateSourceResponse> {
1461        let req = RwCloudValidateSourceRequest {
1462            source_type: source_type.into(),
1463            source_config,
1464        };
1465        let resp = self.inner.rw_cloud_validate_source(req).await?;
1466        Ok(resp)
1467    }
1468
1469    pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1470        self.inner.core.read().await.sink_coordinate_client.clone()
1471    }
1472
1473    pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1474        let req = ListCompactTaskAssignmentRequest {};
1475        let resp = self.inner.list_compact_task_assignment(req).await?;
1476        Ok(resp.task_assignment)
1477    }
1478
1479    pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1480        let req = ListEventLogRequest::default();
1481        let resp = self.inner.list_event_log(req).await?;
1482        Ok(resp.event_logs)
1483    }
1484
1485    pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1486        let req = ListCompactTaskProgressRequest {};
1487        let resp = self.inner.list_compact_task_progress(req).await?;
1488        Ok(resp.task_progress)
1489    }
1490
1491    #[cfg(madsim)]
1492    pub fn try_add_panic_event_blocking(
1493        &self,
1494        panic_info: impl Display,
1495        timeout_millis: Option<u64>,
1496    ) {
1497    }
1498
1499    /// If `timeout_millis` is None, default is used.
1500    #[cfg(not(madsim))]
1501    pub fn try_add_panic_event_blocking(
1502        &self,
1503        panic_info: impl Display,
1504        timeout_millis: Option<u64>,
1505    ) {
1506        let event = event_log::EventWorkerNodePanic {
1507            worker_id: self.worker_id,
1508            worker_type: self.worker_type.into(),
1509            host_addr: Some(self.host_addr.to_protobuf()),
1510            panic_info: format!("{panic_info}"),
1511        };
1512        let grpc_meta_client = self.inner.clone();
1513        let _ = thread::spawn(move || {
1514            let rt = tokio::runtime::Builder::new_current_thread()
1515                .enable_all()
1516                .build()
1517                .unwrap();
1518            let req = AddEventLogRequest {
1519                event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1520            };
1521            rt.block_on(async {
1522                let _ = tokio::time::timeout(
1523                    Duration::from_millis(timeout_millis.unwrap_or(1000)),
1524                    grpc_meta_client.add_event_log(req),
1525                )
1526                .await;
1527            });
1528        })
1529        .join();
1530    }
1531
1532    pub async fn add_sink_fail_evet(
1533        &self,
1534        sink_id: u32,
1535        sink_name: String,
1536        connector: String,
1537        error: String,
1538    ) -> Result<()> {
1539        let event = event_log::EventSinkFail {
1540            sink_id,
1541            sink_name,
1542            connector,
1543            error,
1544        };
1545        let req = AddEventLogRequest {
1546            event: Some(add_event_log_request::Event::SinkFail(event)),
1547        };
1548        self.inner.add_event_log(req).await?;
1549        Ok(())
1550    }
1551
1552    pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1553        let req = CancelCompactTaskRequest {
1554            task_id,
1555            task_status: task_status as _,
1556        };
1557        let resp = self.inner.cancel_compact_task(req).await?;
1558        Ok(resp.ret)
1559    }
1560
1561    pub async fn get_version_by_epoch(
1562        &self,
1563        epoch: HummockEpoch,
1564        table_id: u32,
1565    ) -> Result<PbHummockVersion> {
1566        let req = GetVersionByEpochRequest { epoch, table_id };
1567        let resp = self.inner.get_version_by_epoch(req).await?;
1568        Ok(resp.version.unwrap())
1569    }
1570
1571    pub async fn get_cluster_limits(
1572        &self,
1573    ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1574        let req = GetClusterLimitsRequest {};
1575        let resp = self.inner.get_cluster_limits(req).await?;
1576        Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1577    }
1578
1579    pub async fn merge_compaction_group(
1580        &self,
1581        left_group_id: CompactionGroupId,
1582        right_group_id: CompactionGroupId,
1583    ) -> Result<()> {
1584        let req = MergeCompactionGroupRequest {
1585            left_group_id,
1586            right_group_id,
1587        };
1588        self.inner.merge_compaction_group(req).await?;
1589        Ok(())
1590    }
1591
1592    /// List all rate limits for sources and backfills
1593    pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1594        let request = ListRateLimitsRequest {};
1595        let resp = self.inner.list_rate_limits(request).await?;
1596        Ok(resp.rate_limits)
1597    }
1598
1599    pub async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
1600        let request = ListIcebergTablesRequest {};
1601        let resp = self.inner.list_iceberg_tables(request).await?;
1602        Ok(resp.iceberg_tables)
1603    }
1604}
1605
1606#[async_trait]
1607impl HummockMetaClient for MetaClient {
1608    async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
1609        let req = UnpinVersionBeforeRequest {
1610            context_id: self.worker_id(),
1611            unpin_version_before: unpin_version_before.to_u64(),
1612        };
1613        self.inner.unpin_version_before(req).await?;
1614        Ok(())
1615    }
1616
1617    async fn get_current_version(&self) -> Result<HummockVersion> {
1618        let req = GetCurrentVersionRequest::default();
1619        Ok(HummockVersion::from_rpc_protobuf(
1620            &self
1621                .inner
1622                .get_current_version(req)
1623                .await?
1624                .current_version
1625                .unwrap(),
1626        ))
1627    }
1628
1629    async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
1630        let resp = self
1631            .inner
1632            .get_new_object_ids(GetNewObjectIdsRequest { number })
1633            .await?;
1634        Ok(ObjectIdRange::new(resp.start_id, resp.end_id))
1635    }
1636
1637    async fn commit_epoch_with_change_log(
1638        &self,
1639        _epoch: HummockEpoch,
1640        _sync_result: SyncResult,
1641        _change_log_info: Option<HummockMetaClientChangeLogInfo>,
1642    ) -> Result<()> {
1643        panic!("Only meta service can commit_epoch in production.")
1644    }
1645
1646    async fn trigger_manual_compaction(
1647        &self,
1648        compaction_group_id: u64,
1649        table_id: u32,
1650        level: u32,
1651        sst_ids: Vec<u64>,
1652    ) -> Result<()> {
1653        // TODO: support key_range parameter
1654        let req = TriggerManualCompactionRequest {
1655            compaction_group_id,
1656            table_id,
1657            // if table_id not exist, manual_compaction will include all the sst
1658            // without check internal_table_id
1659            level,
1660            sst_ids,
1661            ..Default::default()
1662        };
1663
1664        self.inner.trigger_manual_compaction(req).await?;
1665        Ok(())
1666    }
1667
1668    async fn trigger_full_gc(
1669        &self,
1670        sst_retention_time_sec: u64,
1671        prefix: Option<String>,
1672    ) -> Result<()> {
1673        self.inner
1674            .trigger_full_gc(TriggerFullGcRequest {
1675                sst_retention_time_sec,
1676                prefix,
1677            })
1678            .await?;
1679        Ok(())
1680    }
1681
1682    async fn subscribe_compaction_event(
1683        &self,
1684    ) -> Result<(
1685        UnboundedSender<SubscribeCompactionEventRequest>,
1686        BoxStream<'static, CompactionEventItem>,
1687    )> {
1688        let (request_sender, request_receiver) =
1689            unbounded_channel::<SubscribeCompactionEventRequest>();
1690        request_sender
1691            .send(SubscribeCompactionEventRequest {
1692                event: Some(subscribe_compaction_event_request::Event::Register(
1693                    Register {
1694                        context_id: self.worker_id(),
1695                    },
1696                )),
1697                create_at: SystemTime::now()
1698                    .duration_since(SystemTime::UNIX_EPOCH)
1699                    .expect("Clock may have gone backwards")
1700                    .as_millis() as u64,
1701            })
1702            .context("Failed to subscribe compaction event")?;
1703
1704        let stream = self
1705            .inner
1706            .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
1707                request_receiver,
1708            )))
1709            .await?;
1710
1711        Ok((request_sender, Box::pin(stream)))
1712    }
1713
1714    async fn get_version_by_epoch(
1715        &self,
1716        epoch: HummockEpoch,
1717        table_id: u32,
1718    ) -> Result<PbHummockVersion> {
1719        self.get_version_by_epoch(epoch, table_id).await
1720    }
1721
1722    async fn subscribe_iceberg_compaction_event(
1723        &self,
1724    ) -> Result<(
1725        UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1726        BoxStream<'static, IcebergCompactionEventItem>,
1727    )> {
1728        let (request_sender, request_receiver) =
1729            unbounded_channel::<SubscribeIcebergCompactionEventRequest>();
1730        request_sender
1731            .send(SubscribeIcebergCompactionEventRequest {
1732                event: Some(subscribe_iceberg_compaction_event_request::Event::Register(
1733                    IcebergRegister {
1734                        context_id: self.worker_id(),
1735                    },
1736                )),
1737                create_at: SystemTime::now()
1738                    .duration_since(SystemTime::UNIX_EPOCH)
1739                    .expect("Clock may have gone backwards")
1740                    .as_millis() as u64,
1741            })
1742            .context("Failed to subscribe compaction event")?;
1743
1744        let stream = self
1745            .inner
1746            .subscribe_iceberg_compaction_event(Request::new(UnboundedReceiverStream::new(
1747                request_receiver,
1748            )))
1749            .await?;
1750
1751        Ok((request_sender, Box::pin(stream)))
1752    }
1753}
1754
1755#[async_trait]
1756impl TelemetryInfoFetcher for MetaClient {
1757    async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
1758        let resp = self
1759            .get_telemetry_info()
1760            .await
1761            .map_err(|e| e.to_report_string())?;
1762        let tracking_id = resp.get_tracking_id().ok();
1763        Ok(tracking_id.map(|id| id.to_owned()))
1764    }
1765}
1766
1767pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
1768
1769#[derive(Debug, Clone)]
1770struct GrpcMetaClientCore {
1771    cluster_client: ClusterServiceClient<Channel>,
1772    meta_member_client: MetaMemberServiceClient<Channel>,
1773    heartbeat_client: HeartbeatServiceClient<Channel>,
1774    ddl_client: DdlServiceClient<Channel>,
1775    hummock_client: HummockManagerServiceClient<Channel>,
1776    notification_client: NotificationServiceClient<Channel>,
1777    stream_client: StreamManagerServiceClient<Channel>,
1778    user_client: UserServiceClient<Channel>,
1779    scale_client: ScaleServiceClient<Channel>,
1780    backup_client: BackupServiceClient<Channel>,
1781    telemetry_client: TelemetryInfoServiceClient<Channel>,
1782    system_params_client: SystemParamsServiceClient<Channel>,
1783    session_params_client: SessionParamServiceClient<Channel>,
1784    serving_client: ServingServiceClient<Channel>,
1785    cloud_client: CloudServiceClient<Channel>,
1786    sink_coordinate_client: SinkCoordinationRpcClient,
1787    event_log_client: EventLogServiceClient<Channel>,
1788    cluster_limit_client: ClusterLimitServiceClient<Channel>,
1789    hosted_iceberg_catalog_service_client: HostedIcebergCatalogServiceClient<Channel>,
1790}
1791
1792impl GrpcMetaClientCore {
1793    pub(crate) fn new(channel: Channel) -> Self {
1794        let cluster_client = ClusterServiceClient::new(channel.clone());
1795        let meta_member_client = MetaMemberClient::new(channel.clone());
1796        let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
1797        let ddl_client =
1798            DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1799        let hummock_client =
1800            HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1801        let notification_client =
1802            NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1803        let stream_client =
1804            StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1805        let user_client = UserServiceClient::new(channel.clone());
1806        let scale_client =
1807            ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1808        let backup_client = BackupServiceClient::new(channel.clone());
1809        let telemetry_client =
1810            TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1811        let system_params_client = SystemParamsServiceClient::new(channel.clone());
1812        let session_params_client = SessionParamServiceClient::new(channel.clone());
1813        let serving_client = ServingServiceClient::new(channel.clone());
1814        let cloud_client = CloudServiceClient::new(channel.clone());
1815        let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone());
1816        let event_log_client = EventLogServiceClient::new(channel.clone());
1817        let cluster_limit_client = ClusterLimitServiceClient::new(channel.clone());
1818        let hosted_iceberg_catalog_service_client = HostedIcebergCatalogServiceClient::new(channel);
1819
1820        GrpcMetaClientCore {
1821            cluster_client,
1822            meta_member_client,
1823            heartbeat_client,
1824            ddl_client,
1825            hummock_client,
1826            notification_client,
1827            stream_client,
1828            user_client,
1829            scale_client,
1830            backup_client,
1831            telemetry_client,
1832            system_params_client,
1833            session_params_client,
1834            serving_client,
1835            cloud_client,
1836            sink_coordinate_client,
1837            event_log_client,
1838            cluster_limit_client,
1839            hosted_iceberg_catalog_service_client,
1840        }
1841    }
1842}
1843
1844/// Client to meta server. Cloning the instance is lightweight.
1845///
1846/// It is a wrapper of tonic client. See [`crate::meta_rpc_client_method_impl`].
1847#[derive(Debug, Clone)]
1848struct GrpcMetaClient {
1849    member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
1850    core: Arc<RwLock<GrpcMetaClientCore>>,
1851}
1852
1853type MetaMemberClient = MetaMemberServiceClient<Channel>;
1854
1855struct MetaMemberGroup {
1856    members: LruCache<http::Uri, Option<MetaMemberClient>>,
1857}
1858
1859struct MetaMemberManagement {
1860    core_ref: Arc<RwLock<GrpcMetaClientCore>>,
1861    members: Either<MetaMemberClient, MetaMemberGroup>,
1862    current_leader: http::Uri,
1863    meta_config: MetaConfig,
1864}
1865
1866impl MetaMemberManagement {
1867    const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
1868
1869    fn host_address_to_uri(addr: HostAddress) -> http::Uri {
1870        format!("http://{}:{}", addr.host, addr.port)
1871            .parse()
1872            .unwrap()
1873    }
1874
1875    async fn recreate_core(&self, channel: Channel) {
1876        let mut core = self.core_ref.write().await;
1877        *core = GrpcMetaClientCore::new(channel);
1878    }
1879
1880    async fn refresh_members(&mut self) -> Result<()> {
1881        let leader_addr = match self.members.as_mut() {
1882            Either::Left(client) => {
1883                let resp = client
1884                    .to_owned()
1885                    .members(MembersRequest {})
1886                    .await
1887                    .map_err(RpcError::from_meta_status)?;
1888                let resp = resp.into_inner();
1889                resp.members.into_iter().find(|member| member.is_leader)
1890            }
1891            Either::Right(member_group) => {
1892                let mut fetched_members = None;
1893
1894                for (addr, client) in &mut member_group.members {
1895                    let members: Result<_> = try {
1896                        let mut client = match client {
1897                            Some(cached_client) => cached_client.to_owned(),
1898                            None => {
1899                                let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
1900                                let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
1901                                    .await
1902                                    .context("failed to create client")?;
1903                                let new_client: MetaMemberClient =
1904                                    MetaMemberServiceClient::new(channel);
1905                                *client = Some(new_client.clone());
1906
1907                                new_client
1908                            }
1909                        };
1910
1911                        let resp = client
1912                            .members(MembersRequest {})
1913                            .await
1914                            .context("failed to fetch members")?;
1915
1916                        resp.into_inner().members
1917                    };
1918
1919                    let fetched = members.is_ok();
1920                    fetched_members = Some(members);
1921                    if fetched {
1922                        break;
1923                    }
1924                }
1925
1926                let members = fetched_members
1927                    .context("no member available in the list")?
1928                    .context("could not refresh members")?;
1929
1930                // find new leader
1931                let mut leader = None;
1932                for member in members {
1933                    if member.is_leader {
1934                        leader = Some(member.clone());
1935                    }
1936
1937                    let addr = Self::host_address_to_uri(member.address.unwrap());
1938                    // We don't clean any expired addrs here to deal with some extreme situations.
1939                    if !member_group.members.contains(&addr) {
1940                        tracing::info!("new meta member joined: {}", addr);
1941                        member_group.members.put(addr, None);
1942                    }
1943                }
1944
1945                leader
1946            }
1947        };
1948
1949        if let Some(leader) = leader_addr {
1950            let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
1951
1952            if discovered_leader != self.current_leader {
1953                tracing::info!("new meta leader {} discovered", discovered_leader);
1954
1955                let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
1956                    Duration::from_secs(self.meta_config.meta_leader_lease_secs),
1957                    false,
1958                );
1959
1960                let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
1961                    let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
1962                    GrpcMetaClient::connect_to_endpoint(endpoint).await
1963                })
1964                .await?;
1965
1966                self.recreate_core(channel).await;
1967                self.current_leader = discovered_leader;
1968            }
1969        }
1970
1971        Ok(())
1972    }
1973}
1974
1975impl GrpcMetaClient {
1976    // See `Endpoint::http2_keep_alive_interval`
1977    const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
1978    // See `Endpoint::keep_alive_timeout`
1979    const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
1980    // Retry base interval in ms for connecting to meta server.
1981    const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
1982    // Max retry times for connecting to meta server.
1983    const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
1984
1985    fn start_meta_member_monitor(
1986        &self,
1987        init_leader_addr: http::Uri,
1988        members: Either<MetaMemberClient, MetaMemberGroup>,
1989        force_refresh_receiver: Receiver<Sender<Result<()>>>,
1990        meta_config: MetaConfig,
1991    ) -> Result<()> {
1992        let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
1993        let current_leader = init_leader_addr;
1994
1995        let enable_period_tick = matches!(members, Either::Right(_));
1996
1997        let member_management = MetaMemberManagement {
1998            core_ref,
1999            members,
2000            current_leader,
2001            meta_config,
2002        };
2003
2004        let mut force_refresh_receiver = force_refresh_receiver;
2005
2006        tokio::spawn(async move {
2007            let mut member_management = member_management;
2008            let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
2009
2010            loop {
2011                let event: Option<Sender<Result<()>>> = if enable_period_tick {
2012                    tokio::select! {
2013                        _ = ticker.tick() => None,
2014                        result_sender = force_refresh_receiver.recv() => {
2015                            if result_sender.is_none() {
2016                                break;
2017                            }
2018
2019                            result_sender
2020                        },
2021                    }
2022                } else {
2023                    let result_sender = force_refresh_receiver.recv().await;
2024
2025                    if result_sender.is_none() {
2026                        break;
2027                    }
2028
2029                    result_sender
2030                };
2031
2032                let tick_result = member_management.refresh_members().await;
2033                if let Err(e) = tick_result.as_ref() {
2034                    tracing::warn!(error = %e.as_report(),  "refresh meta member client failed");
2035                }
2036
2037                if let Some(sender) = event {
2038                    // ignore resp
2039                    let _resp = sender.send(tick_result);
2040                }
2041            }
2042        });
2043
2044        Ok(())
2045    }
2046
2047    async fn force_refresh_leader(&self) -> Result<()> {
2048        let (sender, receiver) = oneshot::channel();
2049
2050        self.member_monitor_event_sender
2051            .send(sender)
2052            .await
2053            .map_err(|e| anyhow!(e))?;
2054
2055        receiver.await.map_err(|e| anyhow!(e))?
2056    }
2057
2058    /// Connect to the meta server from `addrs`.
2059    pub async fn new(strategy: &MetaAddressStrategy, config: MetaConfig) -> Result<Self> {
2060        let (channel, addr) = match strategy {
2061            MetaAddressStrategy::LoadBalance(addr) => {
2062                Self::try_build_rpc_channel(vec![addr.clone()]).await
2063            }
2064            MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
2065        }?;
2066        let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
2067        let client = GrpcMetaClient {
2068            member_monitor_event_sender: force_refresh_sender,
2069            core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
2070        };
2071
2072        let meta_member_client = client.core.read().await.meta_member_client.clone();
2073        let members = match strategy {
2074            MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
2075            MetaAddressStrategy::List(addrs) => {
2076                let mut members = LruCache::new(20);
2077                for addr in addrs {
2078                    members.put(addr.clone(), None);
2079                }
2080                members.put(addr.clone(), Some(meta_member_client));
2081
2082                Either::Right(MetaMemberGroup { members })
2083            }
2084        };
2085
2086        client.start_meta_member_monitor(addr, members, force_refresh_receiver, config)?;
2087
2088        client.force_refresh_leader().await?;
2089
2090        Ok(client)
2091    }
2092
2093    fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2094        Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2095    }
2096
2097    pub(crate) async fn try_build_rpc_channel(
2098        addrs: impl IntoIterator<Item = http::Uri>,
2099    ) -> Result<(Channel, http::Uri)> {
2100        let endpoints: Vec<_> = addrs
2101            .into_iter()
2102            .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2103            .collect();
2104
2105        let mut last_error = None;
2106
2107        for (endpoint, addr) in endpoints {
2108            match Self::connect_to_endpoint(endpoint).await {
2109                Ok(channel) => {
2110                    tracing::info!("Connect to meta server {} successfully", addr);
2111                    return Ok((channel, addr));
2112                }
2113                Err(e) => {
2114                    tracing::warn!(
2115                        error = %e.as_report(),
2116                        "Failed to connect to meta server {}, trying again",
2117                        addr,
2118                    );
2119                    last_error = Some(e);
2120                }
2121            }
2122        }
2123
2124        if let Some(last_error) = last_error {
2125            Err(anyhow::anyhow!(last_error)
2126                .context("failed to connect to all meta servers")
2127                .into())
2128        } else {
2129            bail!("no meta server address provided")
2130        }
2131    }
2132
2133    async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2134        let channel = endpoint
2135            .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2136            .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2137            .connect_timeout(Duration::from_secs(5))
2138            .monitored_connect("grpc-meta-client", Default::default())
2139            .await?
2140            .wrapped();
2141
2142        Ok(channel)
2143    }
2144
2145    pub(crate) fn retry_strategy_to_bound(
2146        high_bound: Duration,
2147        exceed: bool,
2148    ) -> impl Iterator<Item = Duration> {
2149        let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2150            .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2151            .map(jitter);
2152
2153        let mut sum = Duration::default();
2154
2155        iter.take_while(move |duration| {
2156            sum += *duration;
2157
2158            if exceed {
2159                sum < high_bound + *duration
2160            } else {
2161                sum < high_bound
2162            }
2163        })
2164    }
2165}
2166
2167macro_rules! for_all_meta_rpc {
2168    ($macro:ident) => {
2169        $macro! {
2170             { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2171            ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2172            ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2173            ,{ cluster_client, update_worker_node_schedulability, UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse }
2174            ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2175            ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2176            ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2177            ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2178            ,{ stream_client, flush, FlushRequest, FlushResponse }
2179            ,{ stream_client, pause, PauseRequest, PauseResponse }
2180            ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2181            ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2182            ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2183            ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2184            ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2185            ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2186            ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2187            ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2188            ,{ stream_client, list_object_dependencies, ListObjectDependenciesRequest, ListObjectDependenciesResponse }
2189            ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2190            ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2191            ,{ stream_client, alter_connector_props, AlterConnectorPropsRequest, AlterConnectorPropsResponse }
2192            ,{ stream_client, get_fragment_by_id, GetFragmentByIdRequest, GetFragmentByIdResponse }
2193            ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2194            ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2195            ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2196            ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2197            ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2198            ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2199            ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2200            ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2201            ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2202            ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2203            ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2204            ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2205            ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2206            ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2207            ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2208            ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2209            ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2210            ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2211            ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2212            ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2213            ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2214            ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2215            ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2216            ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2217            ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2218            ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2219            ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2220            ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2221            ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2222            ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2223            ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2224            ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2225            ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2226            ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2227            ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2228            ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2229            ,{ ddl_client, wait, WaitRequest, WaitResponse }
2230            ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2231            ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2232            ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2233            ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2234            ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2235            ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2236            ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2237            ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2238            ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2239            ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2240            ,{ hummock_client, get_new_object_ids, GetNewObjectIdsRequest, GetNewObjectIdsResponse }
2241            ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2242            ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2243            ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2244            ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2245            ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2246            ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2247            ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2248            ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2249            ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2250            ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2251            ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2252            ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2253            ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2254            ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2255            ,{ hummock_client, subscribe_iceberg_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeIcebergCompactionEventRequest>, Streaming<SubscribeIcebergCompactionEventResponse> }
2256            ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2257            ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2258            ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2259            ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2260            ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2261            ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2262            ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2263            ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2264            ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2265            ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2266            ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2267            ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2268            ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2269            ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2270            ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2271            ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2272            ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2273            ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2274            ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2275            ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2276            ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2277            ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2278            ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2279            ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2280            ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2281            ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2282            ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2283            ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2284            ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2285            ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2286            ,{ hosted_iceberg_catalog_service_client, list_iceberg_tables, ListIcebergTablesRequest, ListIcebergTablesResponse }
2287        }
2288    };
2289}
2290
2291impl GrpcMetaClient {
2292    async fn refresh_client_if_needed(&self, code: Code) {
2293        if matches!(
2294            code,
2295            Code::Unknown | Code::Unimplemented | Code::Unavailable
2296        ) {
2297            tracing::debug!("matching tonic code {}", code);
2298            let (result_sender, result_receiver) = oneshot::channel();
2299            if self
2300                .member_monitor_event_sender
2301                .try_send(result_sender)
2302                .is_ok()
2303            {
2304                if let Ok(Err(e)) = result_receiver.await {
2305                    tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2306                }
2307            } else {
2308                tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2309            }
2310        }
2311    }
2312}
2313
2314impl GrpcMetaClient {
2315    for_all_meta_rpc! { meta_rpc_client_method_impl }
2316}