risingwave_rpc_client/
meta_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{FunctionId, IndexId, ObjectId, SecretId, TableId};
33use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
34use risingwave_common::hash::WorkerSlotMapping;
35use risingwave_common::monitor::EndpointExt;
36use risingwave_common::system_param::reader::SystemParamsReader;
37use risingwave_common::telemetry::report::TelemetryInfoFetcher;
38use risingwave_common::util::addr::HostAddr;
39use risingwave_common::util::column_index_mapping::ColIndexMapping;
40use risingwave_common::util::meta_addr::MetaAddressStrategy;
41use risingwave_common::util::resource_util::cpu::total_cpu_available;
42use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
43use risingwave_error::bail;
44use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
45use risingwave_hummock_sdk::compaction_group::StateTableId;
46use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
47use risingwave_hummock_sdk::{
48    CompactionGroupId, HummockEpoch, HummockVersionId, SstObjectIdRange, SyncResult,
49};
50use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
51use risingwave_pb::backup_service::*;
52use risingwave_pb::catalog::{
53    Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
54    PbSubscription, PbTable, PbView, Table,
55};
56use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
57use risingwave_pb::cloud_service::*;
58use risingwave_pb::common::worker_node::Property;
59use risingwave_pb::common::{HostAddress, WorkerNode, WorkerType};
60use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
61use risingwave_pb::ddl_service::alter_owner_request::Object;
62use risingwave_pb::ddl_service::create_materialized_view_request::PbBackfillType;
63use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
64use risingwave_pb::ddl_service::drop_table_request::SourceId;
65use risingwave_pb::ddl_service::*;
66use risingwave_pb::hummock::compact_task::TaskStatus;
67use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
68use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
69use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
70use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
71use risingwave_pb::hummock::write_limits::WriteLimit;
72use risingwave_pb::hummock::*;
73use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
74use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
75use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
76use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
77use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
78use risingwave_pb::meta::list_actor_states_response::ActorState;
79use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution;
80use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
81use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
82use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
83use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
84use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
85use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
86use risingwave_pb::meta::serving_service_client::ServingServiceClient;
87use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
88use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
89use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
90use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
91use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
92use risingwave_pb::meta::*;
93use risingwave_pb::stream_plan::StreamFragmentGraph;
94use risingwave_pb::user::update_user_request::UpdateField;
95use risingwave_pb::user::user_service_client::UserServiceClient;
96use risingwave_pb::user::*;
97use thiserror_ext::AsReport;
98use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
99use tokio::sync::oneshot::Sender;
100use tokio::sync::{RwLock, mpsc, oneshot};
101use tokio::task::JoinHandle;
102use tokio::time::{self};
103use tokio_retry::strategy::{ExponentialBackoff, jitter};
104use tokio_stream::wrappers::UnboundedReceiverStream;
105use tonic::transport::Endpoint;
106use tonic::{Code, Request, Streaming};
107
108use crate::channel::{Channel, WrappedChannelExt};
109use crate::error::{Result, RpcError};
110use crate::hummock_meta_client::{
111    CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
112};
113use crate::meta_rpc_client_method_impl;
114
115type ConnectionId = u32;
116type DatabaseId = u32;
117type SchemaId = u32;
118
119/// Client to meta server. Cloning the instance is lightweight.
120#[derive(Clone, Debug)]
121pub struct MetaClient {
122    worker_id: u32,
123    worker_type: WorkerType,
124    host_addr: HostAddr,
125    inner: GrpcMetaClient,
126    meta_config: MetaConfig,
127    cluster_id: String,
128    shutting_down: Arc<AtomicBool>,
129}
130
131impl MetaClient {
132    pub fn worker_id(&self) -> u32 {
133        self.worker_id
134    }
135
136    pub fn host_addr(&self) -> &HostAddr {
137        &self.host_addr
138    }
139
140    pub fn worker_type(&self) -> WorkerType {
141        self.worker_type
142    }
143
144    pub fn cluster_id(&self) -> &str {
145        &self.cluster_id
146    }
147
148    /// Subscribe to notification from meta.
149    pub async fn subscribe(
150        &self,
151        subscribe_type: SubscribeType,
152    ) -> Result<Streaming<SubscribeResponse>> {
153        let request = SubscribeRequest {
154            subscribe_type: subscribe_type as i32,
155            host: Some(self.host_addr.to_protobuf()),
156            worker_id: self.worker_id(),
157        };
158
159        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
160            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
161            true,
162        );
163
164        tokio_retry::Retry::spawn(retry_strategy, || async {
165            let request = request.clone();
166            self.inner.subscribe(request).await
167        })
168        .await
169    }
170
171    pub async fn create_connection(
172        &self,
173        connection_name: String,
174        database_id: u32,
175        schema_id: u32,
176        owner_id: u32,
177        req: create_connection_request::Payload,
178    ) -> Result<WaitVersion> {
179        let request = CreateConnectionRequest {
180            name: connection_name,
181            database_id,
182            schema_id,
183            owner_id,
184            payload: Some(req),
185        };
186        let resp = self.inner.create_connection(request).await?;
187        Ok(resp
188            .version
189            .ok_or_else(|| anyhow!("wait version not set"))?)
190    }
191
192    pub async fn create_secret(
193        &self,
194        secret_name: String,
195        database_id: u32,
196        schema_id: u32,
197        owner_id: u32,
198        value: Vec<u8>,
199    ) -> Result<WaitVersion> {
200        let request = CreateSecretRequest {
201            name: secret_name,
202            database_id,
203            schema_id,
204            owner_id,
205            value,
206        };
207        let resp = self.inner.create_secret(request).await?;
208        Ok(resp
209            .version
210            .ok_or_else(|| anyhow!("wait version not set"))?)
211    }
212
213    pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
214        let request = ListConnectionsRequest {};
215        let resp = self.inner.list_connections(request).await?;
216        Ok(resp.connections)
217    }
218
219    pub async fn drop_connection(&self, connection_id: ConnectionId) -> Result<WaitVersion> {
220        let request = DropConnectionRequest { connection_id };
221        let resp = self.inner.drop_connection(request).await?;
222        Ok(resp
223            .version
224            .ok_or_else(|| anyhow!("wait version not set"))?)
225    }
226
227    pub async fn drop_secret(&self, secret_id: SecretId) -> Result<WaitVersion> {
228        let request = DropSecretRequest {
229            secret_id: secret_id.into(),
230        };
231        let resp = self.inner.drop_secret(request).await?;
232        Ok(resp
233            .version
234            .ok_or_else(|| anyhow!("wait version not set"))?)
235    }
236
237    /// Register the current node to the cluster and set the corresponding worker id.
238    ///
239    /// Retry if there's connection issue with the meta node. Exit the process if the registration fails.
240    pub async fn register_new(
241        addr_strategy: MetaAddressStrategy,
242        worker_type: WorkerType,
243        addr: &HostAddr,
244        property: Property,
245        meta_config: &MetaConfig,
246    ) -> (Self, SystemParamsReader) {
247        let ret =
248            Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
249
250        match ret {
251            Ok(ret) => ret,
252            Err(err) => {
253                tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
254                std::process::exit(1);
255            }
256        }
257    }
258
259    async fn register_new_inner(
260        addr_strategy: MetaAddressStrategy,
261        worker_type: WorkerType,
262        addr: &HostAddr,
263        property: Property,
264        meta_config: &MetaConfig,
265    ) -> Result<(Self, SystemParamsReader)> {
266        tracing::info!("register meta client using strategy: {}", addr_strategy);
267
268        // Retry until reaching `max_heartbeat_interval_secs`
269        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
270            Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
271            true,
272        );
273
274        if property.is_unschedulable {
275            tracing::warn!("worker {:?} registered as unschedulable", addr.clone());
276        }
277        let init_result: Result<_> = tokio_retry::RetryIf::spawn(
278            retry_strategy,
279            || async {
280                let grpc_meta_client =
281                    GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
282
283                let add_worker_resp = grpc_meta_client
284                    .add_worker_node(AddWorkerNodeRequest {
285                        worker_type: worker_type as i32,
286                        host: Some(addr.to_protobuf()),
287                        property: Some(property.clone()),
288                        resource: Some(risingwave_pb::common::worker_node::Resource {
289                            rw_version: RW_VERSION.to_owned(),
290                            total_memory_bytes: system_memory_available_bytes() as _,
291                            total_cpu_cores: total_cpu_available() as _,
292                        }),
293                    })
294                    .await
295                    .context("failed to add worker node")?;
296
297                let system_params_resp = grpc_meta_client
298                    .get_system_params(GetSystemParamsRequest {})
299                    .await
300                    .context("failed to get initial system params")?;
301
302                Ok((add_worker_resp, system_params_resp, grpc_meta_client))
303            },
304            // Only retry if there's any transient connection issue.
305            // If the error is from our implementation or business, do not retry it.
306            |e: &RpcError| !e.is_from_tonic_server_impl(),
307        )
308        .await;
309
310        let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
311        let worker_id = add_worker_resp
312            .node_id
313            .expect("AddWorkerNodeResponse::node_id is empty");
314
315        let meta_client = Self {
316            worker_id,
317            worker_type,
318            host_addr: addr.clone(),
319            inner: grpc_meta_client,
320            meta_config: meta_config.to_owned(),
321            cluster_id: add_worker_resp.cluster_id,
322            shutting_down: Arc::new(false.into()),
323        };
324
325        static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
326        REPORT_PANIC.call_once(|| {
327            let meta_client_clone = meta_client.clone();
328            std::panic::update_hook(move |default_hook, info| {
329                // Try to report panic event to meta node.
330                meta_client_clone.try_add_panic_event_blocking(info, None);
331                default_hook(info);
332            });
333        });
334
335        Ok((meta_client, system_params_resp.params.unwrap().into()))
336    }
337
338    /// Activate the current node in cluster to confirm it's ready to serve.
339    pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
340        let request = ActivateWorkerNodeRequest {
341            host: Some(addr.to_protobuf()),
342            node_id: self.worker_id,
343        };
344        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
345            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
346            true,
347        );
348        tokio_retry::Retry::spawn(retry_strategy, || async {
349            let request = request.clone();
350            self.inner.activate_worker_node(request).await
351        })
352        .await?;
353
354        Ok(())
355    }
356
357    /// Send heartbeat signal to meta service.
358    pub async fn send_heartbeat(&self, node_id: u32) -> Result<()> {
359        let request = HeartbeatRequest { node_id };
360        let resp = self.inner.heartbeat(request).await?;
361        if let Some(status) = resp.status {
362            if status.code() == risingwave_pb::common::status::Code::UnknownWorker {
363                // Ignore the error if we're already shutting down.
364                // Otherwise, exit the process.
365                if !self.shutting_down.load(Relaxed) {
366                    tracing::error!(message = status.message, "worker expired");
367                    std::process::exit(1);
368                }
369            }
370        }
371        Ok(())
372    }
373
374    pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
375        let request = CreateDatabaseRequest { db: Some(db) };
376        let resp = self.inner.create_database(request).await?;
377        // TODO: handle error in `resp.status` here
378        Ok(resp
379            .version
380            .ok_or_else(|| anyhow!("wait version not set"))?)
381    }
382
383    pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
384        let request = CreateSchemaRequest {
385            schema: Some(schema),
386        };
387        let resp = self.inner.create_schema(request).await?;
388        // TODO: handle error in `resp.status` here
389        Ok(resp
390            .version
391            .ok_or_else(|| anyhow!("wait version not set"))?)
392    }
393
394    pub async fn create_materialized_view(
395        &self,
396        table: PbTable,
397        graph: StreamFragmentGraph,
398        dependencies: HashSet<ObjectId>,
399        specific_resource_group: Option<String>,
400    ) -> Result<WaitVersion> {
401        let request = CreateMaterializedViewRequest {
402            materialized_view: Some(table),
403            fragment_graph: Some(graph),
404            backfill: PbBackfillType::Regular as _,
405            dependencies: dependencies.into_iter().collect(),
406            specific_resource_group,
407        };
408        let resp = self.inner.create_materialized_view(request).await?;
409        // TODO: handle error in `resp.status` here
410        Ok(resp
411            .version
412            .ok_or_else(|| anyhow!("wait version not set"))?)
413    }
414
415    pub async fn drop_materialized_view(
416        &self,
417        table_id: TableId,
418        cascade: bool,
419    ) -> Result<WaitVersion> {
420        let request = DropMaterializedViewRequest {
421            table_id: table_id.table_id(),
422            cascade,
423        };
424
425        let resp = self.inner.drop_materialized_view(request).await?;
426        Ok(resp
427            .version
428            .ok_or_else(|| anyhow!("wait version not set"))?)
429    }
430
431    pub async fn create_source(
432        &self,
433        source: PbSource,
434        graph: Option<StreamFragmentGraph>,
435    ) -> Result<WaitVersion> {
436        let request = CreateSourceRequest {
437            source: Some(source),
438            fragment_graph: graph,
439        };
440
441        let resp = self.inner.create_source(request).await?;
442        Ok(resp
443            .version
444            .ok_or_else(|| anyhow!("wait version not set"))?)
445    }
446
447    pub async fn create_sink(
448        &self,
449        sink: PbSink,
450        graph: StreamFragmentGraph,
451        affected_table_change: Option<ReplaceJobPlan>,
452        dependencies: HashSet<ObjectId>,
453    ) -> Result<WaitVersion> {
454        let request = CreateSinkRequest {
455            sink: Some(sink),
456            fragment_graph: Some(graph),
457            affected_table_change,
458            dependencies: dependencies.into_iter().collect(),
459        };
460
461        let resp = self.inner.create_sink(request).await?;
462        Ok(resp
463            .version
464            .ok_or_else(|| anyhow!("wait version not set"))?)
465    }
466
467    pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
468        let request = CreateSubscriptionRequest {
469            subscription: Some(subscription),
470        };
471
472        let resp = self.inner.create_subscription(request).await?;
473        Ok(resp
474            .version
475            .ok_or_else(|| anyhow!("wait version not set"))?)
476    }
477
478    pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
479        let request = CreateFunctionRequest {
480            function: Some(function),
481        };
482        let resp = self.inner.create_function(request).await?;
483        Ok(resp
484            .version
485            .ok_or_else(|| anyhow!("wait version not set"))?)
486    }
487
488    pub async fn create_table(
489        &self,
490        source: Option<PbSource>,
491        table: PbTable,
492        graph: StreamFragmentGraph,
493        job_type: PbTableJobType,
494    ) -> Result<WaitVersion> {
495        let request = CreateTableRequest {
496            materialized_view: Some(table),
497            fragment_graph: Some(graph),
498            source,
499            job_type: job_type as _,
500        };
501        let resp = self.inner.create_table(request).await?;
502        // TODO: handle error in `resp.status` here
503        Ok(resp
504            .version
505            .ok_or_else(|| anyhow!("wait version not set"))?)
506    }
507
508    pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
509        let request = CommentOnRequest {
510            comment: Some(comment),
511        };
512        let resp = self.inner.comment_on(request).await?;
513        Ok(resp
514            .version
515            .ok_or_else(|| anyhow!("wait version not set"))?)
516    }
517
518    pub async fn alter_name(
519        &self,
520        object: alter_name_request::Object,
521        name: &str,
522    ) -> Result<WaitVersion> {
523        let request = AlterNameRequest {
524            object: Some(object),
525            new_name: name.to_owned(),
526        };
527        let resp = self.inner.alter_name(request).await?;
528        Ok(resp
529            .version
530            .ok_or_else(|| anyhow!("wait version not set"))?)
531    }
532
533    pub async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<WaitVersion> {
534        let request = AlterOwnerRequest {
535            object: Some(object),
536            owner_id,
537        };
538        let resp = self.inner.alter_owner(request).await?;
539        Ok(resp
540            .version
541            .ok_or_else(|| anyhow!("wait version not set"))?)
542    }
543
544    pub async fn alter_set_schema(
545        &self,
546        object: alter_set_schema_request::Object,
547        new_schema_id: u32,
548    ) -> Result<WaitVersion> {
549        let request = AlterSetSchemaRequest {
550            new_schema_id,
551            object: Some(object),
552        };
553        let resp = self.inner.alter_set_schema(request).await?;
554        Ok(resp
555            .version
556            .ok_or_else(|| anyhow!("wait version not set"))?)
557    }
558
559    pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
560        let request = AlterSourceRequest {
561            source: Some(source),
562        };
563        let resp = self.inner.alter_source(request).await?;
564        Ok(resp
565            .version
566            .ok_or_else(|| anyhow!("wait version not set"))?)
567    }
568
569    pub async fn alter_parallelism(
570        &self,
571        table_id: u32,
572        parallelism: PbTableParallelism,
573        deferred: bool,
574    ) -> Result<()> {
575        let request = AlterParallelismRequest {
576            table_id,
577            parallelism: Some(parallelism),
578            deferred,
579        };
580
581        self.inner.alter_parallelism(request).await?;
582        Ok(())
583    }
584
585    pub async fn alter_resource_group(
586        &self,
587        table_id: u32,
588        resource_group: Option<String>,
589        deferred: bool,
590    ) -> Result<()> {
591        let request = AlterResourceGroupRequest {
592            table_id,
593            resource_group,
594            deferred,
595        };
596
597        self.inner.alter_resource_group(request).await?;
598        Ok(())
599    }
600
601    pub async fn alter_swap_rename(
602        &self,
603        object: alter_swap_rename_request::Object,
604    ) -> Result<WaitVersion> {
605        let request = AlterSwapRenameRequest {
606            object: Some(object),
607        };
608        let resp = self.inner.alter_swap_rename(request).await?;
609        Ok(resp
610            .version
611            .ok_or_else(|| anyhow!("wait version not set"))?)
612    }
613
614    pub async fn alter_secret(
615        &self,
616        secret_id: u32,
617        secret_name: String,
618        database_id: u32,
619        schema_id: u32,
620        owner_id: u32,
621        value: Vec<u8>,
622    ) -> Result<WaitVersion> {
623        let request = AlterSecretRequest {
624            secret_id,
625            name: secret_name,
626            database_id,
627            schema_id,
628            owner_id,
629            value,
630        };
631        let resp = self.inner.alter_secret(request).await?;
632        Ok(resp
633            .version
634            .ok_or_else(|| anyhow!("wait version not set"))?)
635    }
636
637    pub async fn replace_job(
638        &self,
639        graph: StreamFragmentGraph,
640        table_col_index_mapping: ColIndexMapping,
641        replace_job: ReplaceJob,
642    ) -> Result<WaitVersion> {
643        let request = ReplaceJobPlanRequest {
644            plan: Some(ReplaceJobPlan {
645                fragment_graph: Some(graph),
646                table_col_index_mapping: Some(table_col_index_mapping.to_protobuf()),
647                replace_job: Some(replace_job),
648            }),
649        };
650        let resp = self.inner.replace_job_plan(request).await?;
651        // TODO: handle error in `resp.status` here
652        Ok(resp
653            .version
654            .ok_or_else(|| anyhow!("wait version not set"))?)
655    }
656
657    pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
658        let request = AutoSchemaChangeRequest {
659            schema_change: Some(schema_change),
660        };
661        let _ = self.inner.auto_schema_change(request).await?;
662        Ok(())
663    }
664
665    pub async fn create_view(&self, view: PbView) -> Result<WaitVersion> {
666        let request = CreateViewRequest { view: Some(view) };
667        let resp = self.inner.create_view(request).await?;
668        // TODO: handle error in `resp.status` here
669        Ok(resp
670            .version
671            .ok_or_else(|| anyhow!("wait version not set"))?)
672    }
673
674    pub async fn create_index(
675        &self,
676        index: PbIndex,
677        table: PbTable,
678        graph: StreamFragmentGraph,
679    ) -> Result<WaitVersion> {
680        let request = CreateIndexRequest {
681            index: Some(index),
682            index_table: Some(table),
683            fragment_graph: Some(graph),
684        };
685        let resp = self.inner.create_index(request).await?;
686        // TODO: handle error in `resp.status` here
687        Ok(resp
688            .version
689            .ok_or_else(|| anyhow!("wait version not set"))?)
690    }
691
692    pub async fn drop_table(
693        &self,
694        source_id: Option<u32>,
695        table_id: TableId,
696        cascade: bool,
697    ) -> Result<WaitVersion> {
698        let request = DropTableRequest {
699            source_id: source_id.map(SourceId::Id),
700            table_id: table_id.table_id(),
701            cascade,
702        };
703
704        let resp = self.inner.drop_table(request).await?;
705        Ok(resp
706            .version
707            .ok_or_else(|| anyhow!("wait version not set"))?)
708    }
709
710    pub async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<WaitVersion> {
711        let request = DropViewRequest { view_id, cascade };
712        let resp = self.inner.drop_view(request).await?;
713        Ok(resp
714            .version
715            .ok_or_else(|| anyhow!("wait version not set"))?)
716    }
717
718    pub async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<WaitVersion> {
719        let request = DropSourceRequest { source_id, cascade };
720        let resp = self.inner.drop_source(request).await?;
721        Ok(resp
722            .version
723            .ok_or_else(|| anyhow!("wait version not set"))?)
724    }
725
726    pub async fn drop_sink(
727        &self,
728        sink_id: u32,
729        cascade: bool,
730        affected_table_change: Option<ReplaceJobPlan>,
731    ) -> Result<WaitVersion> {
732        let request = DropSinkRequest {
733            sink_id,
734            cascade,
735            affected_table_change,
736        };
737        let resp = self.inner.drop_sink(request).await?;
738        Ok(resp
739            .version
740            .ok_or_else(|| anyhow!("wait version not set"))?)
741    }
742
743    pub async fn drop_subscription(
744        &self,
745        subscription_id: u32,
746        cascade: bool,
747    ) -> Result<WaitVersion> {
748        let request = DropSubscriptionRequest {
749            subscription_id,
750            cascade,
751        };
752        let resp = self.inner.drop_subscription(request).await?;
753        Ok(resp
754            .version
755            .ok_or_else(|| anyhow!("wait version not set"))?)
756    }
757
758    pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
759        let request = DropIndexRequest {
760            index_id: index_id.index_id,
761            cascade,
762        };
763        let resp = self.inner.drop_index(request).await?;
764        Ok(resp
765            .version
766            .ok_or_else(|| anyhow!("wait version not set"))?)
767    }
768
769    pub async fn drop_function(&self, function_id: FunctionId) -> Result<WaitVersion> {
770        let request = DropFunctionRequest {
771            function_id: function_id.0,
772        };
773        let resp = self.inner.drop_function(request).await?;
774        Ok(resp
775            .version
776            .ok_or_else(|| anyhow!("wait version not set"))?)
777    }
778
779    pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
780        let request = DropDatabaseRequest { database_id };
781        let resp = self.inner.drop_database(request).await?;
782        Ok(resp
783            .version
784            .ok_or_else(|| anyhow!("wait version not set"))?)
785    }
786
787    pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
788        let request = DropSchemaRequest { schema_id, cascade };
789        let resp = self.inner.drop_schema(request).await?;
790        Ok(resp
791            .version
792            .ok_or_else(|| anyhow!("wait version not set"))?)
793    }
794
795    // TODO: using UserInfoVersion instead as return type.
796    pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
797        let request = CreateUserRequest { user: Some(user) };
798        let resp = self.inner.create_user(request).await?;
799        Ok(resp.version)
800    }
801
802    pub async fn drop_user(&self, user_id: u32) -> Result<u64> {
803        let request = DropUserRequest { user_id };
804        let resp = self.inner.drop_user(request).await?;
805        Ok(resp.version)
806    }
807
808    pub async fn update_user(
809        &self,
810        user: UserInfo,
811        update_fields: Vec<UpdateField>,
812    ) -> Result<u64> {
813        let request = UpdateUserRequest {
814            user: Some(user),
815            update_fields: update_fields
816                .into_iter()
817                .map(|field| field as i32)
818                .collect::<Vec<_>>(),
819        };
820        let resp = self.inner.update_user(request).await?;
821        Ok(resp.version)
822    }
823
824    pub async fn grant_privilege(
825        &self,
826        user_ids: Vec<u32>,
827        privileges: Vec<GrantPrivilege>,
828        with_grant_option: bool,
829        granted_by: u32,
830    ) -> Result<u64> {
831        let request = GrantPrivilegeRequest {
832            user_ids,
833            privileges,
834            with_grant_option,
835            granted_by,
836        };
837        let resp = self.inner.grant_privilege(request).await?;
838        Ok(resp.version)
839    }
840
841    pub async fn revoke_privilege(
842        &self,
843        user_ids: Vec<u32>,
844        privileges: Vec<GrantPrivilege>,
845        granted_by: u32,
846        revoke_by: u32,
847        revoke_grant_option: bool,
848        cascade: bool,
849    ) -> Result<u64> {
850        let request = RevokePrivilegeRequest {
851            user_ids,
852            privileges,
853            granted_by,
854            revoke_by,
855            revoke_grant_option,
856            cascade,
857        };
858        let resp = self.inner.revoke_privilege(request).await?;
859        Ok(resp.version)
860    }
861
862    /// Unregister the current node from the cluster.
863    pub async fn unregister(&self) -> Result<()> {
864        let request = DeleteWorkerNodeRequest {
865            host: Some(self.host_addr.to_protobuf()),
866        };
867        self.inner.delete_worker_node(request).await?;
868        self.shutting_down.store(true, Relaxed);
869        Ok(())
870    }
871
872    /// Try to unregister the current worker from the cluster with best effort. Log the result.
873    pub async fn try_unregister(&self) {
874        match self.unregister().await {
875            Ok(_) => {
876                tracing::info!(
877                    worker_id = self.worker_id(),
878                    "successfully unregistered from meta service",
879                )
880            }
881            Err(e) => {
882                tracing::warn!(
883                    error = %e.as_report(),
884                    worker_id = self.worker_id(),
885                    "failed to unregister from meta service",
886                );
887            }
888        }
889    }
890
891    pub async fn update_schedulability(
892        &self,
893        worker_ids: &[u32],
894        schedulability: Schedulability,
895    ) -> Result<UpdateWorkerNodeSchedulabilityResponse> {
896        let request = UpdateWorkerNodeSchedulabilityRequest {
897            worker_ids: worker_ids.to_vec(),
898            schedulability: schedulability.into(),
899        };
900        let resp = self
901            .inner
902            .update_worker_node_schedulability(request)
903            .await?;
904        Ok(resp)
905    }
906
907    pub async fn list_worker_nodes(
908        &self,
909        worker_type: Option<WorkerType>,
910    ) -> Result<Vec<WorkerNode>> {
911        let request = ListAllNodesRequest {
912            worker_type: worker_type.map(Into::into),
913            include_starting_nodes: true,
914        };
915        let resp = self.inner.list_all_nodes(request).await?;
916        Ok(resp.nodes)
917    }
918
919    /// Starts a heartbeat worker.
920    pub fn start_heartbeat_loop(
921        meta_client: MetaClient,
922        min_interval: Duration,
923    ) -> (JoinHandle<()>, Sender<()>) {
924        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
925        let join_handle = tokio::spawn(async move {
926            let mut min_interval_ticker = tokio::time::interval(min_interval);
927            loop {
928                tokio::select! {
929                    biased;
930                    // Shutdown
931                    _ = &mut shutdown_rx => {
932                        tracing::info!("Heartbeat loop is stopped");
933                        return;
934                    }
935                    // Wait for interval
936                    _ = min_interval_ticker.tick() => {},
937                }
938                tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
939                match tokio::time::timeout(
940                    // TODO: decide better min_interval for timeout
941                    min_interval * 3,
942                    meta_client.send_heartbeat(meta_client.worker_id()),
943                )
944                .await
945                {
946                    Ok(Ok(_)) => {}
947                    Ok(Err(err)) => {
948                        tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
949                    }
950                    Err(_) => {
951                        tracing::warn!("Failed to send_heartbeat: timeout");
952                    }
953                }
954            }
955        });
956        (join_handle, shutdown_tx)
957    }
958
959    pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
960        let request = RisectlListStateTablesRequest {};
961        let resp = self.inner.risectl_list_state_tables(request).await?;
962        Ok(resp.tables)
963    }
964
965    pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
966        let request = FlushRequest { database_id };
967        let resp = self.inner.flush(request).await?;
968        Ok(HummockVersionId::new(resp.hummock_version_id))
969    }
970
971    pub async fn wait(&self) -> Result<()> {
972        let request = WaitRequest {};
973        self.inner.wait(request).await?;
974        Ok(())
975    }
976
977    pub async fn recover(&self) -> Result<()> {
978        let request = RecoverRequest {};
979        self.inner.recover(request).await?;
980        Ok(())
981    }
982
983    pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
984        let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
985        let resp = self.inner.cancel_creating_jobs(request).await?;
986        Ok(resp.canceled_jobs)
987    }
988
989    pub async fn list_table_fragments(
990        &self,
991        table_ids: &[u32],
992    ) -> Result<HashMap<u32, TableFragmentInfo>> {
993        let request = ListTableFragmentsRequest {
994            table_ids: table_ids.to_vec(),
995        };
996        let resp = self.inner.list_table_fragments(request).await?;
997        Ok(resp.table_fragments)
998    }
999
1000    pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1001        let resp = self
1002            .inner
1003            .list_streaming_job_states(ListStreamingJobStatesRequest {})
1004            .await?;
1005        Ok(resp.states)
1006    }
1007
1008    pub async fn list_fragment_distributions(&self) -> Result<Vec<FragmentDistribution>> {
1009        let resp = self
1010            .inner
1011            .list_fragment_distribution(ListFragmentDistributionRequest {})
1012            .await?;
1013        Ok(resp.distributions)
1014    }
1015
1016    pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1017        let resp = self
1018            .inner
1019            .list_actor_states(ListActorStatesRequest {})
1020            .await?;
1021        Ok(resp.states)
1022    }
1023
1024    pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1025        let resp = self
1026            .inner
1027            .list_actor_splits(ListActorSplitsRequest {})
1028            .await?;
1029
1030        Ok(resp.actor_splits)
1031    }
1032
1033    pub async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
1034        let resp = self
1035            .inner
1036            .list_object_dependencies(ListObjectDependenciesRequest {})
1037            .await?;
1038        Ok(resp.dependencies)
1039    }
1040
1041    pub async fn pause(&self) -> Result<PauseResponse> {
1042        let request = PauseRequest {};
1043        let resp = self.inner.pause(request).await?;
1044        Ok(resp)
1045    }
1046
1047    pub async fn resume(&self) -> Result<ResumeResponse> {
1048        let request = ResumeRequest {};
1049        let resp = self.inner.resume(request).await?;
1050        Ok(resp)
1051    }
1052
1053    pub async fn apply_throttle(
1054        &self,
1055        kind: PbThrottleTarget,
1056        id: u32,
1057        rate: Option<u32>,
1058    ) -> Result<ApplyThrottleResponse> {
1059        let request = ApplyThrottleRequest {
1060            kind: kind as i32,
1061            id,
1062            rate,
1063        };
1064        let resp = self.inner.apply_throttle(request).await?;
1065        Ok(resp)
1066    }
1067
1068    pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1069        let resp = self
1070            .inner
1071            .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1072            .await?;
1073        Ok(resp.get_status().unwrap())
1074    }
1075
1076    pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1077        let request = GetClusterInfoRequest {};
1078        let resp = self.inner.get_cluster_info(request).await?;
1079        Ok(resp)
1080    }
1081
1082    pub async fn reschedule(
1083        &self,
1084        worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1085        revision: u64,
1086        resolve_no_shuffle_upstream: bool,
1087    ) -> Result<(bool, u64)> {
1088        let request = RescheduleRequest {
1089            revision,
1090            resolve_no_shuffle_upstream,
1091            worker_reschedules,
1092        };
1093        let resp = self.inner.reschedule(request).await?;
1094        Ok((resp.success, resp.revision))
1095    }
1096
1097    pub async fn risectl_get_pinned_versions_summary(
1098        &self,
1099    ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1100        let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1101        self.inner
1102            .rise_ctl_get_pinned_versions_summary(request)
1103            .await
1104    }
1105
1106    pub async fn risectl_get_checkpoint_hummock_version(
1107        &self,
1108    ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1109        let request = RiseCtlGetCheckpointVersionRequest {};
1110        self.inner.rise_ctl_get_checkpoint_version(request).await
1111    }
1112
1113    pub async fn risectl_pause_hummock_version_checkpoint(
1114        &self,
1115    ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1116        let request = RiseCtlPauseVersionCheckpointRequest {};
1117        self.inner.rise_ctl_pause_version_checkpoint(request).await
1118    }
1119
1120    pub async fn risectl_resume_hummock_version_checkpoint(
1121        &self,
1122    ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1123        let request = RiseCtlResumeVersionCheckpointRequest {};
1124        self.inner.rise_ctl_resume_version_checkpoint(request).await
1125    }
1126
1127    pub async fn init_metadata_for_replay(
1128        &self,
1129        tables: Vec<PbTable>,
1130        compaction_groups: Vec<CompactionGroupInfo>,
1131    ) -> Result<()> {
1132        let req = InitMetadataForReplayRequest {
1133            tables,
1134            compaction_groups,
1135        };
1136        let _resp = self.inner.init_metadata_for_replay(req).await?;
1137        Ok(())
1138    }
1139
1140    pub async fn replay_version_delta(
1141        &self,
1142        version_delta: HummockVersionDelta,
1143    ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1144        let req = ReplayVersionDeltaRequest {
1145            version_delta: Some(version_delta.into()),
1146        };
1147        let resp = self.inner.replay_version_delta(req).await?;
1148        Ok((
1149            HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1150            resp.modified_compaction_groups,
1151        ))
1152    }
1153
1154    pub async fn list_version_deltas(
1155        &self,
1156        start_id: HummockVersionId,
1157        num_limit: u32,
1158        committed_epoch_limit: HummockEpoch,
1159    ) -> Result<Vec<HummockVersionDelta>> {
1160        let req = ListVersionDeltasRequest {
1161            start_id: start_id.to_u64(),
1162            num_limit,
1163            committed_epoch_limit,
1164        };
1165        Ok(self
1166            .inner
1167            .list_version_deltas(req)
1168            .await?
1169            .version_deltas
1170            .unwrap()
1171            .version_deltas
1172            .iter()
1173            .map(HummockVersionDelta::from_rpc_protobuf)
1174            .collect())
1175    }
1176
1177    pub async fn trigger_compaction_deterministic(
1178        &self,
1179        version_id: HummockVersionId,
1180        compaction_groups: Vec<CompactionGroupId>,
1181    ) -> Result<()> {
1182        let req = TriggerCompactionDeterministicRequest {
1183            version_id: version_id.to_u64(),
1184            compaction_groups,
1185        };
1186        self.inner.trigger_compaction_deterministic(req).await?;
1187        Ok(())
1188    }
1189
1190    pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1191        let req = DisableCommitEpochRequest {};
1192        Ok(HummockVersion::from_rpc_protobuf(
1193            &self
1194                .inner
1195                .disable_commit_epoch(req)
1196                .await?
1197                .current_version
1198                .unwrap(),
1199        ))
1200    }
1201
1202    pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1203        let req = GetAssignedCompactTaskNumRequest {};
1204        let resp = self.inner.get_assigned_compact_task_num(req).await?;
1205        Ok(resp.num_tasks as usize)
1206    }
1207
1208    pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1209        let req = RiseCtlListCompactionGroupRequest {};
1210        let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1211        Ok(resp.compaction_groups)
1212    }
1213
1214    pub async fn risectl_update_compaction_config(
1215        &self,
1216        compaction_groups: &[CompactionGroupId],
1217        configs: &[MutableConfig],
1218    ) -> Result<()> {
1219        let req = RiseCtlUpdateCompactionConfigRequest {
1220            compaction_group_ids: compaction_groups.to_vec(),
1221            configs: configs
1222                .iter()
1223                .map(
1224                    |c| rise_ctl_update_compaction_config_request::MutableConfig {
1225                        mutable_config: Some(c.clone()),
1226                    },
1227                )
1228                .collect(),
1229        };
1230        let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1231        Ok(())
1232    }
1233
1234    pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1235        let req = BackupMetaRequest { remarks };
1236        let resp = self.inner.backup_meta(req).await?;
1237        Ok(resp.job_id)
1238    }
1239
1240    pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1241        let req = GetBackupJobStatusRequest { job_id };
1242        let resp = self.inner.get_backup_job_status(req).await?;
1243        Ok((resp.job_status(), resp.message))
1244    }
1245
1246    pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1247        let req = DeleteMetaSnapshotRequest {
1248            snapshot_ids: snapshot_ids.to_vec(),
1249        };
1250        let _resp = self.inner.delete_meta_snapshot(req).await?;
1251        Ok(())
1252    }
1253
1254    pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1255        let req = GetMetaSnapshotManifestRequest {};
1256        let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1257        Ok(resp.manifest.expect("should exist"))
1258    }
1259
1260    pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1261        let req = GetTelemetryInfoRequest {};
1262        let resp = self.inner.get_telemetry_info(req).await?;
1263        Ok(resp)
1264    }
1265
1266    pub async fn get_system_params(&self) -> Result<SystemParamsReader> {
1267        let req = GetSystemParamsRequest {};
1268        let resp = self.inner.get_system_params(req).await?;
1269        Ok(resp.params.unwrap().into())
1270    }
1271
1272    pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1273        let req = GetMetaStoreInfoRequest {};
1274        let resp = self.inner.get_meta_store_info(req).await?;
1275        Ok(resp.meta_store_endpoint)
1276    }
1277
1278    pub async fn set_system_param(
1279        &self,
1280        param: String,
1281        value: Option<String>,
1282    ) -> Result<Option<SystemParamsReader>> {
1283        let req = SetSystemParamRequest { param, value };
1284        let resp = self.inner.set_system_param(req).await?;
1285        Ok(resp.params.map(SystemParamsReader::from))
1286    }
1287
1288    pub async fn get_session_params(&self) -> Result<String> {
1289        let req = GetSessionParamsRequest {};
1290        let resp = self.inner.get_session_params(req).await?;
1291        Ok(resp.params)
1292    }
1293
1294    pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1295        let req = SetSessionParamRequest { param, value };
1296        let resp = self.inner.set_session_param(req).await?;
1297        Ok(resp.param)
1298    }
1299
1300    pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1301        let req = GetDdlProgressRequest {};
1302        let resp = self.inner.get_ddl_progress(req).await?;
1303        Ok(resp.ddl_progress)
1304    }
1305
1306    pub async fn split_compaction_group(
1307        &self,
1308        group_id: CompactionGroupId,
1309        table_ids_to_new_group: &[StateTableId],
1310        partition_vnode_count: u32,
1311    ) -> Result<CompactionGroupId> {
1312        let req = SplitCompactionGroupRequest {
1313            group_id,
1314            table_ids: table_ids_to_new_group.to_vec(),
1315            partition_vnode_count,
1316        };
1317        let resp = self.inner.split_compaction_group(req).await?;
1318        Ok(resp.new_group_id)
1319    }
1320
1321    pub async fn get_tables(
1322        &self,
1323        table_ids: &[u32],
1324        include_dropped_tables: bool,
1325    ) -> Result<HashMap<u32, Table>> {
1326        let req = GetTablesRequest {
1327            table_ids: table_ids.to_vec(),
1328            include_dropped_tables,
1329        };
1330        let resp = self.inner.get_tables(req).await?;
1331        Ok(resp.tables)
1332    }
1333
1334    pub async fn list_serving_vnode_mappings(
1335        &self,
1336    ) -> Result<HashMap<u32, (u32, WorkerSlotMapping)>> {
1337        let req = GetServingVnodeMappingsRequest {};
1338        let resp = self.inner.get_serving_vnode_mappings(req).await?;
1339        let mappings = resp
1340            .worker_slot_mappings
1341            .into_iter()
1342            .map(|p| {
1343                (
1344                    p.fragment_id,
1345                    (
1346                        resp.fragment_to_table
1347                            .get(&p.fragment_id)
1348                            .cloned()
1349                            .unwrap_or(0),
1350                        WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1351                    ),
1352                )
1353            })
1354            .collect();
1355        Ok(mappings)
1356    }
1357
1358    pub async fn risectl_list_compaction_status(
1359        &self,
1360    ) -> Result<(
1361        Vec<CompactStatus>,
1362        Vec<CompactTaskAssignment>,
1363        Vec<CompactTaskProgress>,
1364    )> {
1365        let req = RiseCtlListCompactionStatusRequest {};
1366        let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1367        Ok((
1368            resp.compaction_statuses,
1369            resp.task_assignment,
1370            resp.task_progress,
1371        ))
1372    }
1373
1374    pub async fn get_compaction_score(
1375        &self,
1376        compaction_group_id: CompactionGroupId,
1377    ) -> Result<Vec<PickerInfo>> {
1378        let req = GetCompactionScoreRequest {
1379            compaction_group_id,
1380        };
1381        let resp = self.inner.get_compaction_score(req).await?;
1382        Ok(resp.scores)
1383    }
1384
1385    pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1386        let req = RiseCtlRebuildTableStatsRequest {};
1387        let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1388        Ok(())
1389    }
1390
1391    pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1392        let req = ListBranchedObjectRequest {};
1393        let resp = self.inner.list_branched_object(req).await?;
1394        Ok(resp.branched_objects)
1395    }
1396
1397    pub async fn list_active_write_limit(&self) -> Result<HashMap<u64, WriteLimit>> {
1398        let req = ListActiveWriteLimitRequest {};
1399        let resp = self.inner.list_active_write_limit(req).await?;
1400        Ok(resp.write_limits)
1401    }
1402
1403    pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1404        let req = ListHummockMetaConfigRequest {};
1405        let resp = self.inner.list_hummock_meta_config(req).await?;
1406        Ok(resp.configs)
1407    }
1408
1409    pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1410        let _resp = self
1411            .inner
1412            .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1413            .await?;
1414
1415        Ok(())
1416    }
1417
1418    pub async fn rw_cloud_validate_source(
1419        &self,
1420        source_type: SourceType,
1421        source_config: HashMap<String, String>,
1422    ) -> Result<RwCloudValidateSourceResponse> {
1423        let req = RwCloudValidateSourceRequest {
1424            source_type: source_type.into(),
1425            source_config,
1426        };
1427        let resp = self.inner.rw_cloud_validate_source(req).await?;
1428        Ok(resp)
1429    }
1430
1431    pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1432        self.inner.core.read().await.sink_coordinate_client.clone()
1433    }
1434
1435    pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1436        let req = ListCompactTaskAssignmentRequest {};
1437        let resp = self.inner.list_compact_task_assignment(req).await?;
1438        Ok(resp.task_assignment)
1439    }
1440
1441    pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1442        let req = ListEventLogRequest::default();
1443        let resp = self.inner.list_event_log(req).await?;
1444        Ok(resp.event_logs)
1445    }
1446
1447    pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1448        let req = ListCompactTaskProgressRequest {};
1449        let resp = self.inner.list_compact_task_progress(req).await?;
1450        Ok(resp.task_progress)
1451    }
1452
1453    #[cfg(madsim)]
1454    pub fn try_add_panic_event_blocking(
1455        &self,
1456        panic_info: impl Display,
1457        timeout_millis: Option<u64>,
1458    ) {
1459    }
1460
1461    /// If `timeout_millis` is None, default is used.
1462    #[cfg(not(madsim))]
1463    pub fn try_add_panic_event_blocking(
1464        &self,
1465        panic_info: impl Display,
1466        timeout_millis: Option<u64>,
1467    ) {
1468        let event = event_log::EventWorkerNodePanic {
1469            worker_id: self.worker_id,
1470            worker_type: self.worker_type.into(),
1471            host_addr: Some(self.host_addr.to_protobuf()),
1472            panic_info: format!("{panic_info}"),
1473        };
1474        let grpc_meta_client = self.inner.clone();
1475        let _ = thread::spawn(move || {
1476            let rt = tokio::runtime::Builder::new_current_thread()
1477                .enable_all()
1478                .build()
1479                .unwrap();
1480            let req = AddEventLogRequest {
1481                event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1482            };
1483            rt.block_on(async {
1484                let _ = tokio::time::timeout(
1485                    Duration::from_millis(timeout_millis.unwrap_or(1000)),
1486                    grpc_meta_client.add_event_log(req),
1487                )
1488                .await;
1489            });
1490        })
1491        .join();
1492    }
1493
1494    pub async fn add_sink_fail_evet(
1495        &self,
1496        sink_id: u32,
1497        sink_name: String,
1498        connector: String,
1499        error: String,
1500    ) -> Result<()> {
1501        let event = event_log::EventSinkFail {
1502            sink_id,
1503            sink_name,
1504            connector,
1505            error,
1506        };
1507        let req = AddEventLogRequest {
1508            event: Some(add_event_log_request::Event::SinkFail(event)),
1509        };
1510        self.inner.add_event_log(req).await?;
1511        Ok(())
1512    }
1513
1514    pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1515        let req = CancelCompactTaskRequest {
1516            task_id,
1517            task_status: task_status as _,
1518        };
1519        let resp = self.inner.cancel_compact_task(req).await?;
1520        Ok(resp.ret)
1521    }
1522
1523    pub async fn get_version_by_epoch(
1524        &self,
1525        epoch: HummockEpoch,
1526        table_id: u32,
1527    ) -> Result<PbHummockVersion> {
1528        let req = GetVersionByEpochRequest { epoch, table_id };
1529        let resp = self.inner.get_version_by_epoch(req).await?;
1530        Ok(resp.version.unwrap())
1531    }
1532
1533    pub async fn get_cluster_limits(
1534        &self,
1535    ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1536        let req = GetClusterLimitsRequest {};
1537        let resp = self.inner.get_cluster_limits(req).await?;
1538        Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1539    }
1540
1541    pub async fn merge_compaction_group(
1542        &self,
1543        left_group_id: CompactionGroupId,
1544        right_group_id: CompactionGroupId,
1545    ) -> Result<()> {
1546        let req = MergeCompactionGroupRequest {
1547            left_group_id,
1548            right_group_id,
1549        };
1550        self.inner.merge_compaction_group(req).await?;
1551        Ok(())
1552    }
1553
1554    /// List all rate limits for sources and backfills
1555    pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1556        let request = ListRateLimitsRequest {};
1557        let resp = self.inner.list_rate_limits(request).await?;
1558        Ok(resp.rate_limits)
1559    }
1560}
1561
1562#[async_trait]
1563impl HummockMetaClient for MetaClient {
1564    async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
1565        let req = UnpinVersionBeforeRequest {
1566            context_id: self.worker_id(),
1567            unpin_version_before: unpin_version_before.to_u64(),
1568        };
1569        self.inner.unpin_version_before(req).await?;
1570        Ok(())
1571    }
1572
1573    async fn get_current_version(&self) -> Result<HummockVersion> {
1574        let req = GetCurrentVersionRequest::default();
1575        Ok(HummockVersion::from_rpc_protobuf(
1576            &self
1577                .inner
1578                .get_current_version(req)
1579                .await?
1580                .current_version
1581                .unwrap(),
1582        ))
1583    }
1584
1585    async fn get_new_sst_ids(&self, number: u32) -> Result<SstObjectIdRange> {
1586        let resp = self
1587            .inner
1588            .get_new_sst_ids(GetNewSstIdsRequest { number })
1589            .await?;
1590        Ok(SstObjectIdRange::new(resp.start_id, resp.end_id))
1591    }
1592
1593    async fn commit_epoch_with_change_log(
1594        &self,
1595        _epoch: HummockEpoch,
1596        _sync_result: SyncResult,
1597        _change_log_info: Option<HummockMetaClientChangeLogInfo>,
1598    ) -> Result<()> {
1599        panic!("Only meta service can commit_epoch in production.")
1600    }
1601
1602    async fn trigger_manual_compaction(
1603        &self,
1604        compaction_group_id: u64,
1605        table_id: u32,
1606        level: u32,
1607        sst_ids: Vec<u64>,
1608    ) -> Result<()> {
1609        // TODO: support key_range parameter
1610        let req = TriggerManualCompactionRequest {
1611            compaction_group_id,
1612            table_id,
1613            // if table_id not exist, manual_compaction will include all the sst
1614            // without check internal_table_id
1615            level,
1616            sst_ids,
1617            ..Default::default()
1618        };
1619
1620        self.inner.trigger_manual_compaction(req).await?;
1621        Ok(())
1622    }
1623
1624    async fn trigger_full_gc(
1625        &self,
1626        sst_retention_time_sec: u64,
1627        prefix: Option<String>,
1628    ) -> Result<()> {
1629        self.inner
1630            .trigger_full_gc(TriggerFullGcRequest {
1631                sst_retention_time_sec,
1632                prefix,
1633            })
1634            .await?;
1635        Ok(())
1636    }
1637
1638    async fn subscribe_compaction_event(
1639        &self,
1640    ) -> Result<(
1641        UnboundedSender<SubscribeCompactionEventRequest>,
1642        BoxStream<'static, CompactionEventItem>,
1643    )> {
1644        let (request_sender, request_receiver) =
1645            unbounded_channel::<SubscribeCompactionEventRequest>();
1646        request_sender
1647            .send(SubscribeCompactionEventRequest {
1648                event: Some(subscribe_compaction_event_request::Event::Register(
1649                    Register {
1650                        context_id: self.worker_id(),
1651                    },
1652                )),
1653                create_at: SystemTime::now()
1654                    .duration_since(SystemTime::UNIX_EPOCH)
1655                    .expect("Clock may have gone backwards")
1656                    .as_millis() as u64,
1657            })
1658            .context("Failed to subscribe compaction event")?;
1659
1660        let stream = self
1661            .inner
1662            .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
1663                request_receiver,
1664            )))
1665            .await?;
1666
1667        Ok((request_sender, Box::pin(stream)))
1668    }
1669
1670    async fn get_version_by_epoch(
1671        &self,
1672        epoch: HummockEpoch,
1673        table_id: u32,
1674    ) -> Result<PbHummockVersion> {
1675        self.get_version_by_epoch(epoch, table_id).await
1676    }
1677}
1678
1679#[async_trait]
1680impl TelemetryInfoFetcher for MetaClient {
1681    async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
1682        let resp = self
1683            .get_telemetry_info()
1684            .await
1685            .map_err(|e| e.to_report_string())?;
1686        let tracking_id = resp.get_tracking_id().ok();
1687        Ok(tracking_id.map(|id| id.to_owned()))
1688    }
1689}
1690
1691pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
1692
1693#[derive(Debug, Clone)]
1694struct GrpcMetaClientCore {
1695    cluster_client: ClusterServiceClient<Channel>,
1696    meta_member_client: MetaMemberServiceClient<Channel>,
1697    heartbeat_client: HeartbeatServiceClient<Channel>,
1698    ddl_client: DdlServiceClient<Channel>,
1699    hummock_client: HummockManagerServiceClient<Channel>,
1700    notification_client: NotificationServiceClient<Channel>,
1701    stream_client: StreamManagerServiceClient<Channel>,
1702    user_client: UserServiceClient<Channel>,
1703    scale_client: ScaleServiceClient<Channel>,
1704    backup_client: BackupServiceClient<Channel>,
1705    telemetry_client: TelemetryInfoServiceClient<Channel>,
1706    system_params_client: SystemParamsServiceClient<Channel>,
1707    session_params_client: SessionParamServiceClient<Channel>,
1708    serving_client: ServingServiceClient<Channel>,
1709    cloud_client: CloudServiceClient<Channel>,
1710    sink_coordinate_client: SinkCoordinationRpcClient,
1711    event_log_client: EventLogServiceClient<Channel>,
1712    cluster_limit_client: ClusterLimitServiceClient<Channel>,
1713}
1714
1715impl GrpcMetaClientCore {
1716    pub(crate) fn new(channel: Channel) -> Self {
1717        let cluster_client = ClusterServiceClient::new(channel.clone());
1718        let meta_member_client = MetaMemberClient::new(channel.clone());
1719        let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
1720        let ddl_client =
1721            DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1722        let hummock_client =
1723            HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1724        let notification_client =
1725            NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1726        let stream_client =
1727            StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1728        let user_client = UserServiceClient::new(channel.clone());
1729        let scale_client =
1730            ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1731        let backup_client = BackupServiceClient::new(channel.clone());
1732        let telemetry_client =
1733            TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1734        let system_params_client = SystemParamsServiceClient::new(channel.clone());
1735        let session_params_client = SessionParamServiceClient::new(channel.clone());
1736        let serving_client = ServingServiceClient::new(channel.clone());
1737        let cloud_client = CloudServiceClient::new(channel.clone());
1738        let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone());
1739        let event_log_client = EventLogServiceClient::new(channel.clone());
1740        let cluster_limit_client = ClusterLimitServiceClient::new(channel);
1741
1742        GrpcMetaClientCore {
1743            cluster_client,
1744            meta_member_client,
1745            heartbeat_client,
1746            ddl_client,
1747            hummock_client,
1748            notification_client,
1749            stream_client,
1750            user_client,
1751            scale_client,
1752            backup_client,
1753            telemetry_client,
1754            system_params_client,
1755            session_params_client,
1756            serving_client,
1757            cloud_client,
1758            sink_coordinate_client,
1759            event_log_client,
1760            cluster_limit_client,
1761        }
1762    }
1763}
1764
1765/// Client to meta server. Cloning the instance is lightweight.
1766///
1767/// It is a wrapper of tonic client. See [`crate::meta_rpc_client_method_impl`].
1768#[derive(Debug, Clone)]
1769struct GrpcMetaClient {
1770    member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
1771    core: Arc<RwLock<GrpcMetaClientCore>>,
1772}
1773
1774type MetaMemberClient = MetaMemberServiceClient<Channel>;
1775
1776struct MetaMemberGroup {
1777    members: LruCache<http::Uri, Option<MetaMemberClient>>,
1778}
1779
1780struct MetaMemberManagement {
1781    core_ref: Arc<RwLock<GrpcMetaClientCore>>,
1782    members: Either<MetaMemberClient, MetaMemberGroup>,
1783    current_leader: http::Uri,
1784    meta_config: MetaConfig,
1785}
1786
1787impl MetaMemberManagement {
1788    const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
1789
1790    fn host_address_to_uri(addr: HostAddress) -> http::Uri {
1791        format!("http://{}:{}", addr.host, addr.port)
1792            .parse()
1793            .unwrap()
1794    }
1795
1796    async fn recreate_core(&self, channel: Channel) {
1797        let mut core = self.core_ref.write().await;
1798        *core = GrpcMetaClientCore::new(channel);
1799    }
1800
1801    async fn refresh_members(&mut self) -> Result<()> {
1802        let leader_addr = match self.members.as_mut() {
1803            Either::Left(client) => {
1804                let resp = client
1805                    .to_owned()
1806                    .members(MembersRequest {})
1807                    .await
1808                    .map_err(RpcError::from_meta_status)?;
1809                let resp = resp.into_inner();
1810                resp.members.into_iter().find(|member| member.is_leader)
1811            }
1812            Either::Right(member_group) => {
1813                let mut fetched_members = None;
1814
1815                for (addr, client) in &mut member_group.members {
1816                    let members: Result<_> = try {
1817                        let mut client = match client {
1818                            Some(cached_client) => cached_client.to_owned(),
1819                            None => {
1820                                let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
1821                                let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
1822                                    .await
1823                                    .context("failed to create client")?;
1824                                let new_client: MetaMemberClient =
1825                                    MetaMemberServiceClient::new(channel);
1826                                *client = Some(new_client.clone());
1827
1828                                new_client
1829                            }
1830                        };
1831
1832                        let resp = client
1833                            .members(MembersRequest {})
1834                            .await
1835                            .context("failed to fetch members")?;
1836
1837                        resp.into_inner().members
1838                    };
1839
1840                    let fetched = members.is_ok();
1841                    fetched_members = Some(members);
1842                    if fetched {
1843                        break;
1844                    }
1845                }
1846
1847                let members = fetched_members
1848                    .context("no member available in the list")?
1849                    .context("could not refresh members")?;
1850
1851                // find new leader
1852                let mut leader = None;
1853                for member in members {
1854                    if member.is_leader {
1855                        leader = Some(member.clone());
1856                    }
1857
1858                    let addr = Self::host_address_to_uri(member.address.unwrap());
1859                    // We don't clean any expired addrs here to deal with some extreme situations.
1860                    if !member_group.members.contains(&addr) {
1861                        tracing::info!("new meta member joined: {}", addr);
1862                        member_group.members.put(addr, None);
1863                    }
1864                }
1865
1866                leader
1867            }
1868        };
1869
1870        if let Some(leader) = leader_addr {
1871            let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
1872
1873            if discovered_leader != self.current_leader {
1874                tracing::info!("new meta leader {} discovered", discovered_leader);
1875
1876                let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
1877                    Duration::from_secs(self.meta_config.meta_leader_lease_secs),
1878                    false,
1879                );
1880
1881                let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
1882                    let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
1883                    GrpcMetaClient::connect_to_endpoint(endpoint).await
1884                })
1885                .await?;
1886
1887                self.recreate_core(channel).await;
1888                self.current_leader = discovered_leader;
1889            }
1890        }
1891
1892        Ok(())
1893    }
1894}
1895
1896impl GrpcMetaClient {
1897    // See `Endpoint::http2_keep_alive_interval`
1898    const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
1899    // See `Endpoint::keep_alive_timeout`
1900    const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
1901    // Retry base interval in ms for connecting to meta server.
1902    const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
1903    // Max retry times for connecting to meta server.
1904    const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
1905
1906    fn start_meta_member_monitor(
1907        &self,
1908        init_leader_addr: http::Uri,
1909        members: Either<MetaMemberClient, MetaMemberGroup>,
1910        force_refresh_receiver: Receiver<Sender<Result<()>>>,
1911        meta_config: MetaConfig,
1912    ) -> Result<()> {
1913        let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
1914        let current_leader = init_leader_addr;
1915
1916        let enable_period_tick = matches!(members, Either::Right(_));
1917
1918        let member_management = MetaMemberManagement {
1919            core_ref,
1920            members,
1921            current_leader,
1922            meta_config,
1923        };
1924
1925        let mut force_refresh_receiver = force_refresh_receiver;
1926
1927        tokio::spawn(async move {
1928            let mut member_management = member_management;
1929            let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
1930
1931            loop {
1932                let event: Option<Sender<Result<()>>> = if enable_period_tick {
1933                    tokio::select! {
1934                        _ = ticker.tick() => None,
1935                        result_sender = force_refresh_receiver.recv() => {
1936                            if result_sender.is_none() {
1937                                break;
1938                            }
1939
1940                            result_sender
1941                        },
1942                    }
1943                } else {
1944                    let result_sender = force_refresh_receiver.recv().await;
1945
1946                    if result_sender.is_none() {
1947                        break;
1948                    }
1949
1950                    result_sender
1951                };
1952
1953                let tick_result = member_management.refresh_members().await;
1954                if let Err(e) = tick_result.as_ref() {
1955                    tracing::warn!(error = %e.as_report(),  "refresh meta member client failed");
1956                }
1957
1958                if let Some(sender) = event {
1959                    // ignore resp
1960                    let _resp = sender.send(tick_result);
1961                }
1962            }
1963        });
1964
1965        Ok(())
1966    }
1967
1968    async fn force_refresh_leader(&self) -> Result<()> {
1969        let (sender, receiver) = oneshot::channel();
1970
1971        self.member_monitor_event_sender
1972            .send(sender)
1973            .await
1974            .map_err(|e| anyhow!(e))?;
1975
1976        receiver.await.map_err(|e| anyhow!(e))?
1977    }
1978
1979    /// Connect to the meta server from `addrs`.
1980    pub async fn new(strategy: &MetaAddressStrategy, config: MetaConfig) -> Result<Self> {
1981        let (channel, addr) = match strategy {
1982            MetaAddressStrategy::LoadBalance(addr) => {
1983                Self::try_build_rpc_channel(vec![addr.clone()]).await
1984            }
1985            MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
1986        }?;
1987        let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
1988        let client = GrpcMetaClient {
1989            member_monitor_event_sender: force_refresh_sender,
1990            core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
1991        };
1992
1993        let meta_member_client = client.core.read().await.meta_member_client.clone();
1994        let members = match strategy {
1995            MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
1996            MetaAddressStrategy::List(addrs) => {
1997                let mut members = LruCache::new(20);
1998                for addr in addrs {
1999                    members.put(addr.clone(), None);
2000                }
2001                members.put(addr.clone(), Some(meta_member_client));
2002
2003                Either::Right(MetaMemberGroup { members })
2004            }
2005        };
2006
2007        client.start_meta_member_monitor(addr, members, force_refresh_receiver, config)?;
2008
2009        client.force_refresh_leader().await?;
2010
2011        Ok(client)
2012    }
2013
2014    fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2015        Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2016    }
2017
2018    pub(crate) async fn try_build_rpc_channel(
2019        addrs: impl IntoIterator<Item = http::Uri>,
2020    ) -> Result<(Channel, http::Uri)> {
2021        let endpoints: Vec<_> = addrs
2022            .into_iter()
2023            .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2024            .collect();
2025
2026        let mut last_error = None;
2027
2028        for (endpoint, addr) in endpoints {
2029            match Self::connect_to_endpoint(endpoint).await {
2030                Ok(channel) => {
2031                    tracing::info!("Connect to meta server {} successfully", addr);
2032                    return Ok((channel, addr));
2033                }
2034                Err(e) => {
2035                    tracing::warn!(
2036                        error = %e.as_report(),
2037                        "Failed to connect to meta server {}, trying again",
2038                        addr,
2039                    );
2040                    last_error = Some(e);
2041                }
2042            }
2043        }
2044
2045        if let Some(last_error) = last_error {
2046            Err(anyhow::anyhow!(last_error)
2047                .context("failed to connect to all meta servers")
2048                .into())
2049        } else {
2050            bail!("no meta server address provided")
2051        }
2052    }
2053
2054    async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2055        let channel = endpoint
2056            .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2057            .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2058            .connect_timeout(Duration::from_secs(5))
2059            .monitored_connect("grpc-meta-client", Default::default())
2060            .await?
2061            .wrapped();
2062
2063        Ok(channel)
2064    }
2065
2066    pub(crate) fn retry_strategy_to_bound(
2067        high_bound: Duration,
2068        exceed: bool,
2069    ) -> impl Iterator<Item = Duration> {
2070        let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2071            .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2072            .map(jitter);
2073
2074        let mut sum = Duration::default();
2075
2076        iter.take_while(move |duration| {
2077            sum += *duration;
2078
2079            if exceed {
2080                sum < high_bound + *duration
2081            } else {
2082                sum < high_bound
2083            }
2084        })
2085    }
2086}
2087
2088macro_rules! for_all_meta_rpc {
2089    ($macro:ident) => {
2090        $macro! {
2091             { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2092            ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2093            ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2094            ,{ cluster_client, update_worker_node_schedulability, UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse }
2095            ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2096            ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2097            ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2098            ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2099            ,{ stream_client, flush, FlushRequest, FlushResponse }
2100            ,{ stream_client, pause, PauseRequest, PauseResponse }
2101            ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2102            ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2103            ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2104            ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2105            ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2106            ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2107            ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2108            ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2109            ,{ stream_client, list_object_dependencies, ListObjectDependenciesRequest, ListObjectDependenciesResponse }
2110            ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2111            ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2112            ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2113            ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2114            ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2115            ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2116            ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2117            ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2118            ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2119            ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2120            ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2121            ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2122            ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2123            ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2124            ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2125            ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2126            ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2127            ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2128            ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2129            ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2130            ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2131            ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2132            ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2133            ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2134            ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2135            ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2136            ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2137            ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2138            ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2139            ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2140            ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2141            ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2142            ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2143            ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2144            ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2145            ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2146            ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2147            ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2148            ,{ ddl_client, wait, WaitRequest, WaitResponse }
2149            ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2150            ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2151            ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2152            ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2153            ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2154            ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2155            ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2156            ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2157            ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2158            ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2159            ,{ hummock_client, get_new_sst_ids, GetNewSstIdsRequest, GetNewSstIdsResponse }
2160            ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2161            ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2162            ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2163            ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2164            ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2165            ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2166            ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2167            ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2168            ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2169            ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2170            ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2171            ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2172            ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2173            ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2174            ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2175            ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2176            ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2177            ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2178            ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2179            ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2180            ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2181            ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2182            ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2183            ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2184            ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2185            ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2186            ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2187            ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2188            ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2189            ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2190            ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2191            ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2192            ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2193            ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2194            ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2195            ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2196            ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2197            ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2198            ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2199            ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2200            ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2201            ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2202            ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2203            ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2204        }
2205    };
2206}
2207
2208impl GrpcMetaClient {
2209    async fn refresh_client_if_needed(&self, code: Code) {
2210        if matches!(
2211            code,
2212            Code::Unknown | Code::Unimplemented | Code::Unavailable
2213        ) {
2214            tracing::debug!("matching tonic code {}", code);
2215            let (result_sender, result_receiver) = oneshot::channel();
2216            if self
2217                .member_monitor_event_sender
2218                .try_send(result_sender)
2219                .is_ok()
2220            {
2221                if let Ok(Err(e)) = result_receiver.await {
2222                    tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2223                }
2224            } else {
2225                tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2226            }
2227        }
2228    }
2229}
2230
2231impl GrpcMetaClient {
2232    for_all_meta_rpc! { meta_rpc_client_method_impl }
2233}