risingwave_rpc_client/
meta_client.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{
33    AlterDatabaseParam, FunctionId, IndexId, ObjectId, SecretId, TableId,
34};
35use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
36use risingwave_common::hash::WorkerSlotMapping;
37use risingwave_common::id::{
38    ConnectionId, DatabaseId, JobId, SchemaId, SinkId, SubscriptionId, UserId, ViewId, WorkerId,
39};
40use risingwave_common::monitor::EndpointExt;
41use risingwave_common::system_param::reader::SystemParamsReader;
42use risingwave_common::telemetry::report::TelemetryInfoFetcher;
43use risingwave_common::util::addr::HostAddr;
44use risingwave_common::util::meta_addr::MetaAddressStrategy;
45use risingwave_common::util::resource_util::cpu::total_cpu_available;
46use risingwave_common::util::resource_util::hostname;
47use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
48use risingwave_error::bail;
49use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
50use risingwave_hummock_sdk::change_log::{TableChangeLog, TableChangeLogs};
51use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
52use risingwave_hummock_sdk::{
53    CompactionGroupId, HummockEpoch, HummockVersionId, ObjectIdRange, SyncResult,
54};
55use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
56use risingwave_pb::backup_service::*;
57use risingwave_pb::catalog::{
58    Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
59    PbSubscription, PbTable, PbView, Table,
60};
61use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
62use risingwave_pb::cloud_service::*;
63use risingwave_pb::common::worker_node::Property;
64use risingwave_pb::common::{HostAddress, OptionalUint32, OptionalUint64, WorkerNode, WorkerType};
65use risingwave_pb::configured_monitor_service_client;
66use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
67use risingwave_pb::ddl_service::alter_owner_request::Object;
68use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
69use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
70use risingwave_pb::ddl_service::*;
71use risingwave_pb::hummock::compact_task::TaskStatus;
72use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
73use risingwave_pb::hummock::get_table_change_logs_request::PbTableFilter;
74use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
75use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
76use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
77use risingwave_pb::hummock::write_limits::WriteLimit;
78use risingwave_pb::hummock::*;
79use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Register as IcebergRegister;
80use risingwave_pb::iceberg_compaction::{
81    SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
82    subscribe_iceberg_compaction_event_request,
83};
84use risingwave_pb::id::{ActorId, FragmentId, HummockSstableId, SourceId};
85use risingwave_pb::meta::alter_connector_props_request::{
86    AlterConnectorPropsObject, AlterIcebergTableIds, ExtraOptions,
87};
88use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
89use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
90use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
91use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
92use risingwave_pb::meta::hosted_iceberg_catalog_service_client::HostedIcebergCatalogServiceClient;
93use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
94use risingwave_pb::meta::list_actor_states_response::ActorState;
95use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
96use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
97use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
98use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
99use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
100use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
101use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
102use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
103use risingwave_pb::meta::serving_service_client::ServingServiceClient;
104use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
105use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
106use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
107use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
108use risingwave_pb::meta::{FragmentDistribution, *};
109use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
110use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
111use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
112use risingwave_pb::secret::PbSecretRef;
113use risingwave_pb::stream_plan::StreamFragmentGraph;
114use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
115use risingwave_pb::user::update_user_request::UpdateField;
116use risingwave_pb::user::user_service_client::UserServiceClient;
117use risingwave_pb::user::*;
118use thiserror_ext::AsReport;
119use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
120use tokio::sync::oneshot::Sender;
121use tokio::sync::{RwLock, mpsc, oneshot};
122use tokio::task::JoinHandle;
123use tokio::time::{self};
124use tokio_retry::strategy::{ExponentialBackoff, jitter};
125use tokio_stream::wrappers::UnboundedReceiverStream;
126use tonic::transport::Endpoint;
127use tonic::{Code, Request, Streaming};
128
129use crate::channel::{Channel, WrappedChannelExt};
130use crate::error::{Result, RpcError};
131use crate::hummock_meta_client::{
132    CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
133    IcebergCompactionEventItem,
134};
135use crate::meta_rpc_client_method_impl;
136
137/// Client to meta server. Cloning the instance is lightweight.
138#[derive(Clone, Debug)]
139pub struct MetaClient {
140    worker_id: WorkerId,
141    worker_type: WorkerType,
142    host_addr: HostAddr,
143    inner: GrpcMetaClient,
144    meta_config: Arc<MetaConfig>,
145    cluster_id: String,
146    shutting_down: Arc<AtomicBool>,
147}
148
149impl MetaClient {
150    pub fn worker_id(&self) -> WorkerId {
151        self.worker_id
152    }
153
154    pub fn host_addr(&self) -> &HostAddr {
155        &self.host_addr
156    }
157
158    pub fn worker_type(&self) -> WorkerType {
159        self.worker_type
160    }
161
162    pub fn cluster_id(&self) -> &str {
163        &self.cluster_id
164    }
165
166    /// Subscribe to notification from meta.
167    pub async fn subscribe(
168        &self,
169        subscribe_type: SubscribeType,
170    ) -> Result<Streaming<SubscribeResponse>> {
171        let request = SubscribeRequest {
172            subscribe_type: subscribe_type as i32,
173            host: Some(self.host_addr.to_protobuf()),
174            worker_id: self.worker_id(),
175        };
176
177        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
178            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
179            true,
180        );
181
182        tokio_retry::Retry::spawn(retry_strategy, || async {
183            let request = request.clone();
184            self.inner.subscribe(request).await
185        })
186        .await
187    }
188
189    pub async fn create_connection(
190        &self,
191        connection_name: String,
192        database_id: DatabaseId,
193        schema_id: SchemaId,
194        owner_id: UserId,
195        req: create_connection_request::Payload,
196    ) -> Result<WaitVersion> {
197        let request = CreateConnectionRequest {
198            name: connection_name,
199            database_id,
200            schema_id,
201            owner_id,
202            payload: Some(req),
203        };
204        let resp = self.inner.create_connection(request).await?;
205        Ok(resp
206            .version
207            .ok_or_else(|| anyhow!("wait version not set"))?)
208    }
209
210    pub async fn create_secret(
211        &self,
212        secret_name: String,
213        database_id: DatabaseId,
214        schema_id: SchemaId,
215        owner_id: UserId,
216        value: Vec<u8>,
217    ) -> Result<WaitVersion> {
218        let request = CreateSecretRequest {
219            name: secret_name,
220            database_id,
221            schema_id,
222            owner_id,
223            value,
224        };
225        let resp = self.inner.create_secret(request).await?;
226        Ok(resp
227            .version
228            .ok_or_else(|| anyhow!("wait version not set"))?)
229    }
230
231    pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
232        let request = ListConnectionsRequest {};
233        let resp = self.inner.list_connections(request).await?;
234        Ok(resp.connections)
235    }
236
237    pub async fn drop_connection(
238        &self,
239        connection_id: ConnectionId,
240        cascade: bool,
241    ) -> Result<WaitVersion> {
242        let request = DropConnectionRequest {
243            connection_id,
244            cascade,
245        };
246        let resp = self.inner.drop_connection(request).await?;
247        Ok(resp
248            .version
249            .ok_or_else(|| anyhow!("wait version not set"))?)
250    }
251
252    pub async fn drop_secret(&self, secret_id: SecretId, cascade: bool) -> Result<WaitVersion> {
253        let request = DropSecretRequest { secret_id, cascade };
254        let resp = self.inner.drop_secret(request).await?;
255        Ok(resp
256            .version
257            .ok_or_else(|| anyhow!("wait version not set"))?)
258    }
259
260    /// Register the current node to the cluster and set the corresponding worker id.
261    ///
262    /// Retry if there's connection issue with the meta node. Exit the process if the registration fails.
263    pub async fn register_new(
264        addr_strategy: MetaAddressStrategy,
265        worker_type: WorkerType,
266        addr: &HostAddr,
267        property: Property,
268        meta_config: Arc<MetaConfig>,
269    ) -> (Self, SystemParamsReader) {
270        let ret =
271            Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
272
273        match ret {
274            Ok(ret) => ret,
275            Err(err) => {
276                tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
277                std::process::exit(1);
278            }
279        }
280    }
281
282    async fn register_new_inner(
283        addr_strategy: MetaAddressStrategy,
284        worker_type: WorkerType,
285        addr: &HostAddr,
286        property: Property,
287        meta_config: Arc<MetaConfig>,
288    ) -> Result<(Self, SystemParamsReader)> {
289        tracing::info!("register meta client using strategy: {}", addr_strategy);
290
291        // Retry until reaching `max_heartbeat_interval_secs`
292        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
293            Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
294            true,
295        );
296
297        let init_result: Result<_> = tokio_retry::RetryIf::spawn(
298            retry_strategy,
299            || async {
300                let grpc_meta_client =
301                    GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
302
303                let add_worker_resp = grpc_meta_client
304                    .add_worker_node(AddWorkerNodeRequest {
305                        worker_type: worker_type as i32,
306                        host: Some(addr.to_protobuf()),
307                        property: Some(property.clone()),
308                        resource: Some(risingwave_pb::common::worker_node::Resource {
309                            rw_version: RW_VERSION.to_owned(),
310                            total_memory_bytes: system_memory_available_bytes() as _,
311                            total_cpu_cores: total_cpu_available() as _,
312                            hostname: hostname(),
313                        }),
314                    })
315                    .await
316                    .context("failed to add worker node")?;
317
318                let system_params_resp = grpc_meta_client
319                    .get_system_params(GetSystemParamsRequest {})
320                    .await
321                    .context("failed to get initial system params")?;
322
323                Ok((add_worker_resp, system_params_resp, grpc_meta_client))
324            },
325            // Only retry if there's any transient connection issue.
326            // If the error is from our implementation or business, do not retry it.
327            |e: &RpcError| !e.is_from_tonic_server_impl(),
328        )
329        .await;
330
331        let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
332        let worker_id = add_worker_resp
333            .node_id
334            .expect("AddWorkerNodeResponse::node_id is empty");
335
336        let meta_client = Self {
337            worker_id,
338            worker_type,
339            host_addr: addr.clone(),
340            inner: grpc_meta_client,
341            meta_config: meta_config.clone(),
342            cluster_id: add_worker_resp.cluster_id,
343            shutting_down: Arc::new(false.into()),
344        };
345
346        static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
347        REPORT_PANIC.call_once(|| {
348            let meta_client_clone = meta_client.clone();
349            std::panic::update_hook(move |default_hook, info| {
350                // Try to report panic event to meta node.
351                meta_client_clone.try_add_panic_event_blocking(info, None);
352                default_hook(info);
353            });
354        });
355
356        Ok((meta_client, system_params_resp.params.unwrap().into()))
357    }
358
359    /// Activate the current node in cluster to confirm it's ready to serve.
360    pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
361        let request = ActivateWorkerNodeRequest {
362            host: Some(addr.to_protobuf()),
363            node_id: self.worker_id,
364        };
365        let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
366            Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
367            true,
368        );
369        tokio_retry::Retry::spawn(retry_strategy, || async {
370            let request = request.clone();
371            self.inner.activate_worker_node(request).await
372        })
373        .await?;
374
375        Ok(())
376    }
377
378    /// Send heartbeat signal to meta service.
379    pub async fn send_heartbeat(&self) -> Result<()> {
380        let request = HeartbeatRequest {
381            node_id: self.worker_id,
382            resource: Some(risingwave_pb::common::worker_node::Resource {
383                rw_version: RW_VERSION.to_owned(),
384                total_memory_bytes: system_memory_available_bytes() as _,
385                total_cpu_cores: total_cpu_available() as _,
386                hostname: hostname(),
387            }),
388        };
389        let resp = self.inner.heartbeat(request).await?;
390        if let Some(status) = resp.status
391            && status.code() == risingwave_pb::common::status::Code::UnknownWorker
392        {
393            // Ignore the error if we're already shutting down.
394            // Otherwise, exit the process.
395            if !self.shutting_down.load(Relaxed) {
396                tracing::error!(message = status.message, "worker expired");
397                std::process::exit(1);
398            }
399        }
400        Ok(())
401    }
402
403    pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
404        let request = CreateDatabaseRequest { db: Some(db) };
405        let resp = self.inner.create_database(request).await?;
406        // TODO: handle error in `resp.status` here
407        Ok(resp
408            .version
409            .ok_or_else(|| anyhow!("wait version not set"))?)
410    }
411
412    pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
413        let request = CreateSchemaRequest {
414            schema: Some(schema),
415        };
416        let resp = self.inner.create_schema(request).await?;
417        // TODO: handle error in `resp.status` here
418        Ok(resp
419            .version
420            .ok_or_else(|| anyhow!("wait version not set"))?)
421    }
422
423    pub async fn create_materialized_view(
424        &self,
425        table: PbTable,
426        graph: StreamFragmentGraph,
427        dependencies: HashSet<ObjectId>,
428        resource_type: streaming_job_resource_type::ResourceType,
429        if_not_exists: bool,
430    ) -> Result<WaitVersion> {
431        let request = CreateMaterializedViewRequest {
432            materialized_view: Some(table),
433            fragment_graph: Some(graph),
434            resource_type: Some(PbStreamingJobResourceType {
435                resource_type: Some(resource_type),
436            }),
437            dependencies: dependencies.into_iter().collect(),
438            if_not_exists,
439        };
440        let resp = self.inner.create_materialized_view(request).await?;
441        // TODO: handle error in `resp.status` here
442        Ok(resp
443            .version
444            .ok_or_else(|| anyhow!("wait version not set"))?)
445    }
446
447    pub async fn drop_materialized_view(
448        &self,
449        table_id: TableId,
450        cascade: bool,
451    ) -> Result<WaitVersion> {
452        let request = DropMaterializedViewRequest { table_id, cascade };
453
454        let resp = self.inner.drop_materialized_view(request).await?;
455        Ok(resp
456            .version
457            .ok_or_else(|| anyhow!("wait version not set"))?)
458    }
459
460    pub async fn create_source(
461        &self,
462        source: PbSource,
463        graph: Option<StreamFragmentGraph>,
464        if_not_exists: bool,
465    ) -> Result<WaitVersion> {
466        let request = CreateSourceRequest {
467            source: Some(source),
468            fragment_graph: graph,
469            if_not_exists,
470        };
471
472        let resp = self.inner.create_source(request).await?;
473        Ok(resp
474            .version
475            .ok_or_else(|| anyhow!("wait version not set"))?)
476    }
477
478    pub async fn create_sink(
479        &self,
480        sink: PbSink,
481        graph: StreamFragmentGraph,
482        dependencies: HashSet<ObjectId>,
483        if_not_exists: bool,
484    ) -> Result<WaitVersion> {
485        let request = CreateSinkRequest {
486            sink: Some(sink),
487            fragment_graph: Some(graph),
488            dependencies: dependencies.into_iter().collect(),
489            if_not_exists,
490        };
491
492        let resp = self.inner.create_sink(request).await?;
493        Ok(resp
494            .version
495            .ok_or_else(|| anyhow!("wait version not set"))?)
496    }
497
498    pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
499        let request = CreateSubscriptionRequest {
500            subscription: Some(subscription),
501        };
502
503        let resp = self.inner.create_subscription(request).await?;
504        Ok(resp
505            .version
506            .ok_or_else(|| anyhow!("wait version not set"))?)
507    }
508
509    pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
510        let request = CreateFunctionRequest {
511            function: Some(function),
512        };
513        let resp = self.inner.create_function(request).await?;
514        Ok(resp
515            .version
516            .ok_or_else(|| anyhow!("wait version not set"))?)
517    }
518
519    pub async fn create_table(
520        &self,
521        source: Option<PbSource>,
522        table: PbTable,
523        graph: StreamFragmentGraph,
524        job_type: PbTableJobType,
525        if_not_exists: bool,
526        dependencies: HashSet<ObjectId>,
527    ) -> Result<WaitVersion> {
528        let request = CreateTableRequest {
529            materialized_view: Some(table),
530            fragment_graph: Some(graph),
531            source,
532            job_type: job_type as _,
533            if_not_exists,
534            dependencies: dependencies.into_iter().collect(),
535        };
536        let resp = self.inner.create_table(request).await?;
537        // TODO: handle error in `resp.status` here
538        Ok(resp
539            .version
540            .ok_or_else(|| anyhow!("wait version not set"))?)
541    }
542
543    pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
544        let request = CommentOnRequest {
545            comment: Some(comment),
546        };
547        let resp = self.inner.comment_on(request).await?;
548        Ok(resp
549            .version
550            .ok_or_else(|| anyhow!("wait version not set"))?)
551    }
552
553    pub async fn alter_name(
554        &self,
555        object: alter_name_request::Object,
556        name: &str,
557    ) -> Result<WaitVersion> {
558        let request = AlterNameRequest {
559            object: Some(object),
560            new_name: name.to_owned(),
561        };
562        let resp = self.inner.alter_name(request).await?;
563        Ok(resp
564            .version
565            .ok_or_else(|| anyhow!("wait version not set"))?)
566    }
567
568    pub async fn alter_owner(&self, object: Object, owner_id: UserId) -> Result<WaitVersion> {
569        let request = AlterOwnerRequest {
570            object: Some(object),
571            owner_id,
572        };
573        let resp = self.inner.alter_owner(request).await?;
574        Ok(resp
575            .version
576            .ok_or_else(|| anyhow!("wait version not set"))?)
577    }
578
579    pub async fn alter_subscription_retention(
580        &self,
581        subscription_id: SubscriptionId,
582        retention_seconds: u64,
583        definition: String,
584    ) -> Result<WaitVersion> {
585        let request = AlterSubscriptionRetentionRequest {
586            subscription_id,
587            retention_seconds,
588            definition,
589        };
590        let resp = self.inner.alter_subscription_retention(request).await?;
591        Ok(resp
592            .version
593            .ok_or_else(|| anyhow!("wait version not set"))?)
594    }
595
596    pub async fn alter_database_param(
597        &self,
598        database_id: DatabaseId,
599        param: AlterDatabaseParam,
600    ) -> Result<WaitVersion> {
601        let request = match param {
602            AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms) => {
603                let barrier_interval_ms = OptionalUint32 {
604                    value: barrier_interval_ms,
605                };
606                AlterDatabaseParamRequest {
607                    database_id,
608                    param: Some(alter_database_param_request::Param::BarrierIntervalMs(
609                        barrier_interval_ms,
610                    )),
611                }
612            }
613            AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency) => {
614                let checkpoint_frequency = OptionalUint64 {
615                    value: checkpoint_frequency,
616                };
617                AlterDatabaseParamRequest {
618                    database_id,
619                    param: Some(alter_database_param_request::Param::CheckpointFrequency(
620                        checkpoint_frequency,
621                    )),
622                }
623            }
624        };
625        let resp = self.inner.alter_database_param(request).await?;
626        Ok(resp
627            .version
628            .ok_or_else(|| anyhow!("wait version not set"))?)
629    }
630
631    pub async fn alter_set_schema(
632        &self,
633        object: alter_set_schema_request::Object,
634        new_schema_id: SchemaId,
635    ) -> Result<WaitVersion> {
636        let request = AlterSetSchemaRequest {
637            new_schema_id,
638            object: Some(object),
639        };
640        let resp = self.inner.alter_set_schema(request).await?;
641        Ok(resp
642            .version
643            .ok_or_else(|| anyhow!("wait version not set"))?)
644    }
645
646    pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
647        let request = AlterSourceRequest {
648            source: Some(source),
649        };
650        let resp = self.inner.alter_source(request).await?;
651        Ok(resp
652            .version
653            .ok_or_else(|| anyhow!("wait version not set"))?)
654    }
655
656    pub async fn alter_parallelism(
657        &self,
658        job_id: JobId,
659        parallelism: PbTableParallelism,
660        deferred: bool,
661    ) -> Result<()> {
662        let request = AlterParallelismRequest {
663            table_id: job_id,
664            parallelism: Some(parallelism),
665            deferred,
666            adaptive_parallelism_strategy: None,
667        };
668
669        self.inner.alter_parallelism(request).await?;
670        Ok(())
671    }
672
673    pub async fn alter_backfill_parallelism(
674        &self,
675        job_id: JobId,
676        parallelism: Option<PbTableParallelism>,
677        deferred: bool,
678    ) -> Result<()> {
679        let request = AlterBackfillParallelismRequest {
680            table_id: job_id,
681            parallelism,
682            deferred,
683            adaptive_parallelism_strategy: None,
684        };
685
686        self.inner.alter_backfill_parallelism(request).await?;
687        Ok(())
688    }
689
690    pub async fn alter_streaming_job_config(
691        &self,
692        job_id: JobId,
693        entries_to_add: HashMap<String, String>,
694        keys_to_remove: Vec<String>,
695    ) -> Result<()> {
696        let request = AlterStreamingJobConfigRequest {
697            job_id,
698            entries_to_add,
699            keys_to_remove,
700        };
701
702        self.inner.alter_streaming_job_config(request).await?;
703        Ok(())
704    }
705
706    pub async fn alter_fragment_parallelism(
707        &self,
708        fragment_ids: Vec<FragmentId>,
709        parallelism: Option<PbTableParallelism>,
710    ) -> Result<()> {
711        let request = AlterFragmentParallelismRequest {
712            fragment_ids,
713            parallelism,
714        };
715
716        self.inner.alter_fragment_parallelism(request).await?;
717        Ok(())
718    }
719
720    pub async fn alter_cdc_table_backfill_parallelism(
721        &self,
722        table_id: JobId,
723        parallelism: PbTableParallelism,
724    ) -> Result<()> {
725        let request = AlterCdcTableBackfillParallelismRequest {
726            table_id,
727            parallelism: Some(parallelism),
728        };
729        self.inner
730            .alter_cdc_table_backfill_parallelism(request)
731            .await?;
732        Ok(())
733    }
734
735    pub async fn alter_resource_group(
736        &self,
737        table_id: TableId,
738        resource_group: Option<String>,
739        deferred: bool,
740    ) -> Result<()> {
741        let request = AlterResourceGroupRequest {
742            table_id,
743            resource_group,
744            deferred,
745        };
746
747        self.inner.alter_resource_group(request).await?;
748        Ok(())
749    }
750
751    pub async fn alter_swap_rename(
752        &self,
753        object: alter_swap_rename_request::Object,
754    ) -> Result<WaitVersion> {
755        let request = AlterSwapRenameRequest {
756            object: Some(object),
757        };
758        let resp = self.inner.alter_swap_rename(request).await?;
759        Ok(resp
760            .version
761            .ok_or_else(|| anyhow!("wait version not set"))?)
762    }
763
764    pub async fn alter_secret(
765        &self,
766        secret_id: SecretId,
767        secret_name: String,
768        database_id: DatabaseId,
769        schema_id: SchemaId,
770        owner_id: UserId,
771        value: Vec<u8>,
772    ) -> Result<WaitVersion> {
773        let request = AlterSecretRequest {
774            secret_id,
775            name: secret_name,
776            database_id,
777            schema_id,
778            owner_id,
779            value,
780        };
781        let resp = self.inner.alter_secret(request).await?;
782        Ok(resp
783            .version
784            .ok_or_else(|| anyhow!("wait version not set"))?)
785    }
786
787    pub async fn replace_job(
788        &self,
789        graph: StreamFragmentGraph,
790        replace_job: ReplaceJob,
791    ) -> Result<WaitVersion> {
792        let request = ReplaceJobPlanRequest {
793            plan: Some(ReplaceJobPlan {
794                fragment_graph: Some(graph),
795                replace_job: Some(replace_job),
796            }),
797        };
798        let resp = self.inner.replace_job_plan(request).await?;
799        // TODO: handle error in `resp.status` here
800        Ok(resp
801            .version
802            .ok_or_else(|| anyhow!("wait version not set"))?)
803    }
804
805    pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
806        let request = AutoSchemaChangeRequest {
807            schema_change: Some(schema_change),
808        };
809        let _ = self.inner.auto_schema_change(request).await?;
810        Ok(())
811    }
812
813    pub async fn create_view(
814        &self,
815        view: PbView,
816        dependencies: HashSet<ObjectId>,
817    ) -> Result<WaitVersion> {
818        let request = CreateViewRequest {
819            view: Some(view),
820            dependencies: dependencies.into_iter().collect(),
821        };
822        let resp = self.inner.create_view(request).await?;
823        // TODO: handle error in `resp.status` here
824        Ok(resp
825            .version
826            .ok_or_else(|| anyhow!("wait version not set"))?)
827    }
828
829    pub async fn create_index(
830        &self,
831        index: PbIndex,
832        table: PbTable,
833        graph: StreamFragmentGraph,
834        if_not_exists: bool,
835    ) -> Result<WaitVersion> {
836        let request = CreateIndexRequest {
837            index: Some(index),
838            index_table: Some(table),
839            fragment_graph: Some(graph),
840            if_not_exists,
841        };
842        let resp = self.inner.create_index(request).await?;
843        // TODO: handle error in `resp.status` here
844        Ok(resp
845            .version
846            .ok_or_else(|| anyhow!("wait version not set"))?)
847    }
848
849    pub async fn drop_table(
850        &self,
851        source_id: Option<SourceId>,
852        table_id: TableId,
853        cascade: bool,
854    ) -> Result<WaitVersion> {
855        let request = DropTableRequest {
856            source_id: source_id.map(risingwave_pb::ddl_service::drop_table_request::SourceId::Id),
857            table_id,
858            cascade,
859        };
860
861        let resp = self.inner.drop_table(request).await?;
862        Ok(resp
863            .version
864            .ok_or_else(|| anyhow!("wait version not set"))?)
865    }
866
867    pub async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
868        let request = CompactIcebergTableRequest { sink_id };
869        let resp = self.inner.compact_iceberg_table(request).await?;
870        Ok(resp.task_id)
871    }
872
873    pub async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
874        let request = ExpireIcebergTableSnapshotsRequest { sink_id };
875        let _resp = self.inner.expire_iceberg_table_snapshots(request).await?;
876        Ok(())
877    }
878
879    pub async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<WaitVersion> {
880        let request = DropViewRequest { view_id, cascade };
881        let resp = self.inner.drop_view(request).await?;
882        Ok(resp
883            .version
884            .ok_or_else(|| anyhow!("wait version not set"))?)
885    }
886
887    pub async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<WaitVersion> {
888        let request = DropSourceRequest { source_id, cascade };
889        let resp = self.inner.drop_source(request).await?;
890        Ok(resp
891            .version
892            .ok_or_else(|| anyhow!("wait version not set"))?)
893    }
894
895    pub async fn reset_source(&self, source_id: SourceId) -> Result<WaitVersion> {
896        let request = ResetSourceRequest { source_id };
897        let resp = self.inner.reset_source(request).await?;
898        Ok(resp
899            .version
900            .ok_or_else(|| anyhow!("wait version not set"))?)
901    }
902
903    pub async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<WaitVersion> {
904        let request = DropSinkRequest { sink_id, cascade };
905        let resp = self.inner.drop_sink(request).await?;
906        Ok(resp
907            .version
908            .ok_or_else(|| anyhow!("wait version not set"))?)
909    }
910
911    pub async fn drop_subscription(
912        &self,
913        subscription_id: SubscriptionId,
914        cascade: bool,
915    ) -> Result<WaitVersion> {
916        let request = DropSubscriptionRequest {
917            subscription_id,
918            cascade,
919        };
920        let resp = self.inner.drop_subscription(request).await?;
921        Ok(resp
922            .version
923            .ok_or_else(|| anyhow!("wait version not set"))?)
924    }
925
926    pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
927        let request = DropIndexRequest { index_id, cascade };
928        let resp = self.inner.drop_index(request).await?;
929        Ok(resp
930            .version
931            .ok_or_else(|| anyhow!("wait version not set"))?)
932    }
933
934    pub async fn drop_function(
935        &self,
936        function_id: FunctionId,
937        cascade: bool,
938    ) -> Result<WaitVersion> {
939        let request = DropFunctionRequest {
940            function_id,
941            cascade,
942        };
943        let resp = self.inner.drop_function(request).await?;
944        Ok(resp
945            .version
946            .ok_or_else(|| anyhow!("wait version not set"))?)
947    }
948
949    pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
950        let request = DropDatabaseRequest { database_id };
951        let resp = self.inner.drop_database(request).await?;
952        Ok(resp
953            .version
954            .ok_or_else(|| anyhow!("wait version not set"))?)
955    }
956
957    pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
958        let request = DropSchemaRequest { schema_id, cascade };
959        let resp = self.inner.drop_schema(request).await?;
960        Ok(resp
961            .version
962            .ok_or_else(|| anyhow!("wait version not set"))?)
963    }
964
965    // TODO: using UserInfoVersion instead as return type.
966    pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
967        let request = CreateUserRequest { user: Some(user) };
968        let resp = self.inner.create_user(request).await?;
969        Ok(resp.version)
970    }
971
972    pub async fn drop_user(&self, user_id: UserId) -> Result<u64> {
973        let request = DropUserRequest { user_id };
974        let resp = self.inner.drop_user(request).await?;
975        Ok(resp.version)
976    }
977
978    pub async fn update_user(
979        &self,
980        user: UserInfo,
981        update_fields: Vec<UpdateField>,
982    ) -> Result<u64> {
983        let request = UpdateUserRequest {
984            user: Some(user),
985            update_fields: update_fields
986                .into_iter()
987                .map(|field| field as i32)
988                .collect::<Vec<_>>(),
989        };
990        let resp = self.inner.update_user(request).await?;
991        Ok(resp.version)
992    }
993
994    pub async fn grant_privilege(
995        &self,
996        user_ids: Vec<UserId>,
997        privileges: Vec<GrantPrivilege>,
998        with_grant_option: bool,
999        granted_by: UserId,
1000    ) -> Result<u64> {
1001        let request = GrantPrivilegeRequest {
1002            user_ids,
1003            privileges,
1004            with_grant_option,
1005            granted_by,
1006        };
1007        let resp = self.inner.grant_privilege(request).await?;
1008        Ok(resp.version)
1009    }
1010
1011    pub async fn revoke_privilege(
1012        &self,
1013        user_ids: Vec<UserId>,
1014        privileges: Vec<GrantPrivilege>,
1015        granted_by: UserId,
1016        revoke_by: UserId,
1017        revoke_grant_option: bool,
1018        cascade: bool,
1019    ) -> Result<u64> {
1020        let request = RevokePrivilegeRequest {
1021            user_ids,
1022            privileges,
1023            granted_by,
1024            revoke_by,
1025            revoke_grant_option,
1026            cascade,
1027        };
1028        let resp = self.inner.revoke_privilege(request).await?;
1029        Ok(resp.version)
1030    }
1031
1032    pub async fn alter_default_privilege(
1033        &self,
1034        user_ids: Vec<UserId>,
1035        database_id: DatabaseId,
1036        schema_ids: Vec<SchemaId>,
1037        operation: AlterDefaultPrivilegeOperation,
1038        granted_by: UserId,
1039    ) -> Result<()> {
1040        let request = AlterDefaultPrivilegeRequest {
1041            user_ids,
1042            database_id,
1043            schema_ids,
1044            operation: Some(operation),
1045            granted_by,
1046        };
1047        self.inner.alter_default_privilege(request).await?;
1048        Ok(())
1049    }
1050
1051    /// Unregister the current node from the cluster.
1052    pub async fn unregister(&self) -> Result<()> {
1053        let request = DeleteWorkerNodeRequest {
1054            host: Some(self.host_addr.to_protobuf()),
1055        };
1056        self.inner.delete_worker_node(request).await?;
1057        self.shutting_down.store(true, Relaxed);
1058        Ok(())
1059    }
1060
1061    /// Try to unregister the current worker from the cluster with best effort. Log the result.
1062    pub async fn try_unregister(&self) {
1063        match self.unregister().await {
1064            Ok(_) => {
1065                tracing::info!(
1066                    worker_id = %self.worker_id(),
1067                    "successfully unregistered from meta service",
1068                )
1069            }
1070            Err(e) => {
1071                tracing::warn!(
1072                    error = %e.as_report(),
1073                    worker_id = %self.worker_id(),
1074                    "failed to unregister from meta service",
1075                );
1076            }
1077        }
1078    }
1079
1080    pub async fn list_worker_nodes(
1081        &self,
1082        worker_type: Option<WorkerType>,
1083    ) -> Result<Vec<WorkerNode>> {
1084        let request = ListAllNodesRequest {
1085            worker_type: worker_type.map(Into::into),
1086            include_starting_nodes: true,
1087        };
1088        let resp = self.inner.list_all_nodes(request).await?;
1089        Ok(resp.nodes)
1090    }
1091
1092    /// Starts a heartbeat worker.
1093    pub fn start_heartbeat_loop(
1094        meta_client: MetaClient,
1095        min_interval: Duration,
1096    ) -> (JoinHandle<()>, Sender<()>) {
1097        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1098        let join_handle = tokio::spawn(async move {
1099            let mut min_interval_ticker = tokio::time::interval(min_interval);
1100            loop {
1101                tokio::select! {
1102                    biased;
1103                    // Shutdown
1104                    _ = &mut shutdown_rx => {
1105                        tracing::info!("Heartbeat loop is stopped");
1106                        return;
1107                    }
1108                    // Wait for interval
1109                    _ = min_interval_ticker.tick() => {},
1110                }
1111                tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
1112                match tokio::time::timeout(
1113                    // TODO: decide better min_interval for timeout
1114                    min_interval * 3,
1115                    meta_client.send_heartbeat(),
1116                )
1117                .await
1118                {
1119                    Ok(Ok(_)) => {}
1120                    Ok(Err(err)) => {
1121                        tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
1122                    }
1123                    Err(_) => {
1124                        tracing::warn!("Failed to send_heartbeat: timeout");
1125                    }
1126                }
1127            }
1128        });
1129        (join_handle, shutdown_tx)
1130    }
1131
1132    pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
1133        let request = RisectlListStateTablesRequest {};
1134        let resp = self.inner.risectl_list_state_tables(request).await?;
1135        Ok(resp.tables)
1136    }
1137
1138    pub async fn risectl_resume_backfill(
1139        &self,
1140        request: RisectlResumeBackfillRequest,
1141    ) -> Result<()> {
1142        self.inner.risectl_resume_backfill(request).await?;
1143        Ok(())
1144    }
1145
1146    pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
1147        let request = FlushRequest { database_id };
1148        let resp = self.inner.flush(request).await?;
1149        Ok(resp.hummock_version_id)
1150    }
1151
1152    pub async fn wait(&self, job_id: Option<JobId>) -> Result<WaitVersion> {
1153        let request = WaitRequest { job_id };
1154        let resp = self.inner.wait(request).await?;
1155        Ok(resp
1156            .version
1157            .ok_or_else(|| anyhow!("wait version not set"))?)
1158    }
1159
1160    pub async fn recover(&self) -> Result<()> {
1161        let request = RecoverRequest {};
1162        self.inner.recover(request).await?;
1163        Ok(())
1164    }
1165
1166    pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
1167        let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
1168        let resp = self.inner.cancel_creating_jobs(request).await?;
1169        Ok(resp.canceled_jobs)
1170    }
1171
1172    pub async fn list_table_fragments(
1173        &self,
1174        job_ids: &[JobId],
1175    ) -> Result<HashMap<JobId, TableFragmentInfo>> {
1176        let request = ListTableFragmentsRequest {
1177            table_ids: job_ids.to_vec(),
1178        };
1179        let resp = self.inner.list_table_fragments(request).await?;
1180        Ok(resp.table_fragments)
1181    }
1182
1183    pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1184        let resp = self
1185            .inner
1186            .list_streaming_job_states(ListStreamingJobStatesRequest {})
1187            .await?;
1188        Ok(resp.states)
1189    }
1190
1191    pub async fn list_fragment_distributions(
1192        &self,
1193        include_node: bool,
1194    ) -> Result<Vec<FragmentDistribution>> {
1195        let resp = self
1196            .inner
1197            .list_fragment_distribution(ListFragmentDistributionRequest {
1198                include_node: Some(include_node),
1199            })
1200            .await?;
1201        Ok(resp.distributions)
1202    }
1203
1204    pub async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
1205        let resp = self
1206            .inner
1207            .list_creating_fragment_distribution(ListCreatingFragmentDistributionRequest {
1208                include_node: Some(true),
1209            })
1210            .await?;
1211        Ok(resp.distributions)
1212    }
1213
1214    pub async fn get_fragment_by_id(
1215        &self,
1216        fragment_id: FragmentId,
1217    ) -> Result<Option<FragmentDistribution>> {
1218        let resp = self
1219            .inner
1220            .get_fragment_by_id(GetFragmentByIdRequest { fragment_id })
1221            .await?;
1222        Ok(resp.distribution)
1223    }
1224
1225    pub async fn get_fragment_vnodes(
1226        &self,
1227        fragment_id: FragmentId,
1228    ) -> Result<Vec<(ActorId, Vec<u32>)>> {
1229        let resp = self
1230            .inner
1231            .get_fragment_vnodes(GetFragmentVnodesRequest { fragment_id })
1232            .await?;
1233        Ok(resp
1234            .actor_vnodes
1235            .into_iter()
1236            .map(|actor| (actor.actor_id, actor.vnode_indices))
1237            .collect())
1238    }
1239
1240    pub async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>> {
1241        let resp = self
1242            .inner
1243            .get_actor_vnodes(GetActorVnodesRequest { actor_id })
1244            .await?;
1245        Ok(resp.vnode_indices)
1246    }
1247
1248    pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1249        let resp = self
1250            .inner
1251            .list_actor_states(ListActorStatesRequest {})
1252            .await?;
1253        Ok(resp.states)
1254    }
1255
1256    pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1257        let resp = self
1258            .inner
1259            .list_actor_splits(ListActorSplitsRequest {})
1260            .await?;
1261
1262        Ok(resp.actor_splits)
1263    }
1264
1265    pub async fn pause(&self) -> Result<PauseResponse> {
1266        let request = PauseRequest {};
1267        let resp = self.inner.pause(request).await?;
1268        Ok(resp)
1269    }
1270
1271    pub async fn resume(&self) -> Result<ResumeResponse> {
1272        let request = ResumeRequest {};
1273        let resp = self.inner.resume(request).await?;
1274        Ok(resp)
1275    }
1276
1277    pub async fn apply_throttle(
1278        &self,
1279        throttle_target: PbThrottleTarget,
1280        throttle_type: risingwave_pb::common::PbThrottleType,
1281        id: u32,
1282        rate: Option<u32>,
1283    ) -> Result<ApplyThrottleResponse> {
1284        let request = ApplyThrottleRequest {
1285            throttle_target: throttle_target as i32,
1286            throttle_type: throttle_type as i32,
1287            id,
1288            rate,
1289        };
1290        let resp = self.inner.apply_throttle(request).await?;
1291        Ok(resp)
1292    }
1293
1294    pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1295        let resp = self
1296            .inner
1297            .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1298            .await?;
1299        Ok(resp.get_status().unwrap())
1300    }
1301
1302    pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1303        let request = GetClusterInfoRequest {};
1304        let resp = self.inner.get_cluster_info(request).await?;
1305        Ok(resp)
1306    }
1307
1308    pub async fn reschedule(
1309        &self,
1310        worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1311        revision: u64,
1312        resolve_no_shuffle_upstream: bool,
1313    ) -> Result<(bool, u64)> {
1314        let request = RescheduleRequest {
1315            revision,
1316            resolve_no_shuffle_upstream,
1317            worker_reschedules,
1318        };
1319        let resp = self.inner.reschedule(request).await?;
1320        Ok((resp.success, resp.revision))
1321    }
1322
1323    pub async fn risectl_get_pinned_versions_summary(
1324        &self,
1325    ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1326        let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1327        self.inner
1328            .rise_ctl_get_pinned_versions_summary(request)
1329            .await
1330    }
1331
1332    pub async fn risectl_get_checkpoint_hummock_version(
1333        &self,
1334    ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1335        let request = RiseCtlGetCheckpointVersionRequest {};
1336        self.inner.rise_ctl_get_checkpoint_version(request).await
1337    }
1338
1339    pub async fn risectl_pause_hummock_version_checkpoint(
1340        &self,
1341    ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1342        let request = RiseCtlPauseVersionCheckpointRequest {};
1343        self.inner.rise_ctl_pause_version_checkpoint(request).await
1344    }
1345
1346    pub async fn risectl_resume_hummock_version_checkpoint(
1347        &self,
1348    ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1349        let request = RiseCtlResumeVersionCheckpointRequest {};
1350        self.inner.rise_ctl_resume_version_checkpoint(request).await
1351    }
1352
1353    pub async fn init_metadata_for_replay(
1354        &self,
1355        tables: Vec<PbTable>,
1356        compaction_groups: Vec<CompactionGroupInfo>,
1357    ) -> Result<()> {
1358        let req = InitMetadataForReplayRequest {
1359            tables,
1360            compaction_groups,
1361        };
1362        let _resp = self.inner.init_metadata_for_replay(req).await?;
1363        Ok(())
1364    }
1365
1366    pub async fn replay_version_delta(
1367        &self,
1368        version_delta: HummockVersionDelta,
1369    ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1370        let req = ReplayVersionDeltaRequest {
1371            version_delta: Some(version_delta.into()),
1372        };
1373        let resp = self.inner.replay_version_delta(req).await?;
1374        Ok((
1375            HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1376            resp.modified_compaction_groups,
1377        ))
1378    }
1379
1380    pub async fn list_version_deltas(
1381        &self,
1382        start_id: HummockVersionId,
1383        num_limit: u32,
1384        committed_epoch_limit: HummockEpoch,
1385    ) -> Result<Vec<HummockVersionDelta>> {
1386        let req = ListVersionDeltasRequest {
1387            start_id,
1388            num_limit,
1389            committed_epoch_limit,
1390        };
1391        Ok(self
1392            .inner
1393            .list_version_deltas(req)
1394            .await?
1395            .version_deltas
1396            .unwrap()
1397            .version_deltas
1398            .iter()
1399            .map(HummockVersionDelta::from_rpc_protobuf)
1400            .collect())
1401    }
1402
1403    pub async fn trigger_compaction_deterministic(
1404        &self,
1405        version_id: HummockVersionId,
1406        compaction_groups: Vec<CompactionGroupId>,
1407    ) -> Result<()> {
1408        let req = TriggerCompactionDeterministicRequest {
1409            version_id,
1410            compaction_groups,
1411        };
1412        self.inner.trigger_compaction_deterministic(req).await?;
1413        Ok(())
1414    }
1415
1416    pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1417        let req = DisableCommitEpochRequest {};
1418        Ok(HummockVersion::from_rpc_protobuf(
1419            &self
1420                .inner
1421                .disable_commit_epoch(req)
1422                .await?
1423                .current_version
1424                .unwrap(),
1425        ))
1426    }
1427
1428    pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1429        let req = GetAssignedCompactTaskNumRequest {};
1430        let resp = self.inner.get_assigned_compact_task_num(req).await?;
1431        Ok(resp.num_tasks as usize)
1432    }
1433
1434    pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1435        let req = RiseCtlListCompactionGroupRequest {};
1436        let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1437        Ok(resp.compaction_groups)
1438    }
1439
1440    pub async fn risectl_update_compaction_config(
1441        &self,
1442        compaction_groups: &[CompactionGroupId],
1443        configs: &[MutableConfig],
1444    ) -> Result<()> {
1445        let req = RiseCtlUpdateCompactionConfigRequest {
1446            compaction_group_ids: compaction_groups.to_vec(),
1447            configs: configs
1448                .iter()
1449                .map(
1450                    |c| rise_ctl_update_compaction_config_request::MutableConfig {
1451                        mutable_config: Some(c.clone()),
1452                    },
1453                )
1454                .collect(),
1455        };
1456        let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1457        Ok(())
1458    }
1459
1460    pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1461        let req = BackupMetaRequest { remarks };
1462        let resp = self.inner.backup_meta(req).await?;
1463        Ok(resp.job_id)
1464    }
1465
1466    pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1467        let req = GetBackupJobStatusRequest { job_id };
1468        let resp = self.inner.get_backup_job_status(req).await?;
1469        Ok((resp.job_status(), resp.message))
1470    }
1471
1472    pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1473        let req = DeleteMetaSnapshotRequest {
1474            snapshot_ids: snapshot_ids.to_vec(),
1475        };
1476        let _resp = self.inner.delete_meta_snapshot(req).await?;
1477        Ok(())
1478    }
1479
1480    pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1481        let req = GetMetaSnapshotManifestRequest {};
1482        let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1483        Ok(resp.manifest.expect("should exist"))
1484    }
1485
1486    pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1487        let req = GetTelemetryInfoRequest {};
1488        let resp = self.inner.get_telemetry_info(req).await?;
1489        Ok(resp)
1490    }
1491
1492    pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1493        let req = GetMetaStoreInfoRequest {};
1494        let resp = self.inner.get_meta_store_info(req).await?;
1495        Ok(resp.meta_store_endpoint)
1496    }
1497
1498    pub async fn alter_sink_props(
1499        &self,
1500        sink_id: SinkId,
1501        changed_props: BTreeMap<String, String>,
1502        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1503        connector_conn_ref: Option<ConnectionId>,
1504    ) -> Result<()> {
1505        let req = AlterConnectorPropsRequest {
1506            object_id: sink_id.as_raw_id(),
1507            changed_props: changed_props.into_iter().collect(),
1508            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1509            connector_conn_ref,
1510            object_type: AlterConnectorPropsObject::Sink as i32,
1511            extra_options: None,
1512        };
1513        let _resp = self.inner.alter_connector_props(req).await?;
1514        Ok(())
1515    }
1516
1517    pub async fn alter_iceberg_table_props(
1518        &self,
1519        table_id: TableId,
1520        sink_id: SinkId,
1521        source_id: SourceId,
1522        changed_props: BTreeMap<String, String>,
1523        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1524        connector_conn_ref: Option<ConnectionId>,
1525    ) -> Result<()> {
1526        let req = AlterConnectorPropsRequest {
1527            object_id: table_id.as_raw_id(),
1528            changed_props: changed_props.into_iter().collect(),
1529            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1530            connector_conn_ref,
1531            object_type: AlterConnectorPropsObject::IcebergTable as i32,
1532            extra_options: Some(ExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds {
1533                sink_id,
1534                source_id,
1535            })),
1536        };
1537        let _resp = self.inner.alter_connector_props(req).await?;
1538        Ok(())
1539    }
1540
1541    pub async fn alter_source_connector_props(
1542        &self,
1543        source_id: SourceId,
1544        changed_props: BTreeMap<String, String>,
1545        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1546        connector_conn_ref: Option<ConnectionId>,
1547    ) -> Result<()> {
1548        let req = AlterConnectorPropsRequest {
1549            object_id: source_id.as_raw_id(),
1550            changed_props: changed_props.into_iter().collect(),
1551            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1552            connector_conn_ref,
1553            object_type: AlterConnectorPropsObject::Source as i32,
1554            extra_options: None,
1555        };
1556        let _resp = self.inner.alter_connector_props(req).await?;
1557        Ok(())
1558    }
1559
1560    pub async fn alter_connection_connector_props(
1561        &self,
1562        connection_id: u32,
1563        changed_props: BTreeMap<String, String>,
1564        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1565    ) -> Result<()> {
1566        let req = AlterConnectorPropsRequest {
1567            object_id: connection_id,
1568            changed_props: changed_props.into_iter().collect(),
1569            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1570            connector_conn_ref: None, // Connections don't reference other connections
1571            object_type: AlterConnectorPropsObject::Connection as i32,
1572            extra_options: None,
1573        };
1574        let _resp = self.inner.alter_connector_props(req).await?;
1575        Ok(())
1576    }
1577
1578    /// Orchestrated source property update with pause/update/resume workflow.
1579    /// This is the "safe" version that pauses sources before updating and resumes after.
1580    pub async fn alter_source_properties_safe(
1581        &self,
1582        source_id: SourceId,
1583        changed_props: BTreeMap<String, String>,
1584        changed_secret_refs: BTreeMap<String, PbSecretRef>,
1585        reset_splits: bool,
1586    ) -> Result<()> {
1587        let req = AlterSourcePropertiesSafeRequest {
1588            source_id: source_id.as_raw_id(),
1589            changed_props: changed_props.into_iter().collect(),
1590            changed_secret_refs: changed_secret_refs.into_iter().collect(),
1591            options: Some(PropertyUpdateOptions { reset_splits }),
1592        };
1593        let _resp = self.inner.alter_source_properties_safe(req).await?;
1594        Ok(())
1595    }
1596
1597    /// Reset source split assignments (UNSAFE - admin only).
1598    /// This clears persisted split metadata and triggers re-discovery.
1599    pub async fn reset_source_splits(&self, source_id: SourceId) -> Result<()> {
1600        let req = ResetSourceSplitsRequest {
1601            source_id: source_id.as_raw_id(),
1602        };
1603        let _resp = self.inner.reset_source_splits(req).await?;
1604        Ok(())
1605    }
1606
1607    /// Inject specific offsets into source splits (UNSAFE - admin only).
1608    /// This can cause data duplication or loss depending on the correctness of the provided offsets.
1609    pub async fn inject_source_offsets(
1610        &self,
1611        source_id: SourceId,
1612        split_offsets: HashMap<String, String>,
1613    ) -> Result<Vec<String>> {
1614        let req = InjectSourceOffsetsRequest {
1615            source_id: source_id.as_raw_id(),
1616            split_offsets,
1617        };
1618        let resp = self.inner.inject_source_offsets(req).await?;
1619        Ok(resp.applied_split_ids)
1620    }
1621
1622    pub async fn set_system_param(
1623        &self,
1624        param: String,
1625        value: Option<String>,
1626    ) -> Result<Option<SystemParamsReader>> {
1627        let req = SetSystemParamRequest { param, value };
1628        let resp = self.inner.set_system_param(req).await?;
1629        Ok(resp.params.map(SystemParamsReader::from))
1630    }
1631
1632    pub async fn get_session_params(&self) -> Result<String> {
1633        let req = GetSessionParamsRequest {};
1634        let resp = self.inner.get_session_params(req).await?;
1635        Ok(resp.params)
1636    }
1637
1638    pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1639        let req = SetSessionParamRequest { param, value };
1640        let resp = self.inner.set_session_param(req).await?;
1641        Ok(resp.param)
1642    }
1643
1644    pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1645        let req = GetDdlProgressRequest {};
1646        let resp = self.inner.get_ddl_progress(req).await?;
1647        Ok(resp.ddl_progress)
1648    }
1649
1650    pub async fn split_compaction_group(
1651        &self,
1652        group_id: CompactionGroupId,
1653        table_ids_to_new_group: &[TableId],
1654        partition_vnode_count: u32,
1655    ) -> Result<CompactionGroupId> {
1656        let req = SplitCompactionGroupRequest {
1657            group_id,
1658            table_ids: table_ids_to_new_group.to_vec(),
1659            partition_vnode_count,
1660        };
1661        let resp = self.inner.split_compaction_group(req).await?;
1662        Ok(resp.new_group_id)
1663    }
1664
1665    pub async fn get_tables(
1666        &self,
1667        table_ids: Vec<TableId>,
1668        include_dropped_tables: bool,
1669    ) -> Result<HashMap<TableId, Table>> {
1670        let req = GetTablesRequest {
1671            table_ids,
1672            include_dropped_tables,
1673        };
1674        let resp = self.inner.get_tables(req).await?;
1675        Ok(resp.tables)
1676    }
1677
1678    pub async fn list_serving_vnode_mappings(
1679        &self,
1680    ) -> Result<HashMap<FragmentId, (JobId, WorkerSlotMapping)>> {
1681        let req = GetServingVnodeMappingsRequest {};
1682        let resp = self.inner.get_serving_vnode_mappings(req).await?;
1683        let mappings = resp
1684            .worker_slot_mappings
1685            .into_iter()
1686            .map(|p| {
1687                (
1688                    p.fragment_id,
1689                    (
1690                        resp.fragment_to_table
1691                            .get(&p.fragment_id)
1692                            .cloned()
1693                            .unwrap_or_default(),
1694                        WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1695                    ),
1696                )
1697            })
1698            .collect();
1699        Ok(mappings)
1700    }
1701
1702    pub async fn risectl_list_compaction_status(
1703        &self,
1704    ) -> Result<(
1705        Vec<CompactStatus>,
1706        Vec<CompactTaskAssignment>,
1707        Vec<CompactTaskProgress>,
1708    )> {
1709        let req = RiseCtlListCompactionStatusRequest {};
1710        let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1711        Ok((
1712            resp.compaction_statuses,
1713            resp.task_assignment,
1714            resp.task_progress,
1715        ))
1716    }
1717
1718    pub async fn get_compaction_score(
1719        &self,
1720        compaction_group_id: CompactionGroupId,
1721    ) -> Result<Vec<PickerInfo>> {
1722        let req = GetCompactionScoreRequest {
1723            compaction_group_id,
1724        };
1725        let resp = self.inner.get_compaction_score(req).await?;
1726        Ok(resp.scores)
1727    }
1728
1729    pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1730        let req = RiseCtlRebuildTableStatsRequest {};
1731        let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1732        Ok(())
1733    }
1734
1735    pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1736        let req = ListBranchedObjectRequest {};
1737        let resp = self.inner.list_branched_object(req).await?;
1738        Ok(resp.branched_objects)
1739    }
1740
1741    pub async fn list_active_write_limit(&self) -> Result<HashMap<CompactionGroupId, WriteLimit>> {
1742        let req = ListActiveWriteLimitRequest {};
1743        let resp = self.inner.list_active_write_limit(req).await?;
1744        Ok(resp.write_limits)
1745    }
1746
1747    pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1748        let req = ListHummockMetaConfigRequest {};
1749        let resp = self.inner.list_hummock_meta_config(req).await?;
1750        Ok(resp.configs)
1751    }
1752
1753    pub async fn get_table_change_logs(
1754        &self,
1755        epoch_only: bool,
1756        start_epoch_inclusive: Option<u64>,
1757        end_epoch_inclusive: Option<u64>,
1758        table_ids: Option<HashSet<TableId>>,
1759        exclude_empty: bool,
1760        limit: Option<u32>,
1761    ) -> Result<TableChangeLogs> {
1762        let req = GetTableChangeLogsRequest {
1763            epoch_only,
1764            start_epoch_inclusive,
1765            end_epoch_inclusive,
1766            table_ids: table_ids.map(|iter| PbTableFilter {
1767                table_ids: iter.into_iter().collect::<Vec<_>>(),
1768            }),
1769            exclude_empty,
1770            limit,
1771        };
1772        let resp = self.inner.get_table_change_logs(req).await?;
1773        Ok(resp
1774            .table_change_logs
1775            .into_iter()
1776            .map(|(id, change_log)| (TableId::new(id), TableChangeLog::from_protobuf(&change_log)))
1777            .collect())
1778    }
1779
1780    pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1781        let _resp = self
1782            .inner
1783            .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1784            .await?;
1785
1786        Ok(())
1787    }
1788
1789    pub async fn rw_cloud_validate_source(
1790        &self,
1791        source_type: SourceType,
1792        source_config: HashMap<String, String>,
1793    ) -> Result<RwCloudValidateSourceResponse> {
1794        let req = RwCloudValidateSourceRequest {
1795            source_type: source_type.into(),
1796            source_config,
1797        };
1798        let resp = self.inner.rw_cloud_validate_source(req).await?;
1799        Ok(resp)
1800    }
1801
1802    pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1803        self.inner.core.read().await.sink_coordinate_client.clone()
1804    }
1805
1806    pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1807        let req = ListCompactTaskAssignmentRequest {};
1808        let resp = self.inner.list_compact_task_assignment(req).await?;
1809        Ok(resp.task_assignment)
1810    }
1811
1812    pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1813        let req = ListEventLogRequest::default();
1814        let resp = self.inner.list_event_log(req).await?;
1815        Ok(resp.event_logs)
1816    }
1817
1818    pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1819        let req = ListCompactTaskProgressRequest {};
1820        let resp = self.inner.list_compact_task_progress(req).await?;
1821        Ok(resp.task_progress)
1822    }
1823
1824    #[cfg(madsim)]
1825    pub fn try_add_panic_event_blocking(
1826        &self,
1827        panic_info: impl Display,
1828        timeout_millis: Option<u64>,
1829    ) {
1830    }
1831
1832    /// If `timeout_millis` is None, default is used.
1833    #[cfg(not(madsim))]
1834    pub fn try_add_panic_event_blocking(
1835        &self,
1836        panic_info: impl Display,
1837        timeout_millis: Option<u64>,
1838    ) {
1839        let event = event_log::EventWorkerNodePanic {
1840            worker_id: self.worker_id,
1841            worker_type: self.worker_type.into(),
1842            host_addr: Some(self.host_addr.to_protobuf()),
1843            panic_info: format!("{panic_info}"),
1844        };
1845        let grpc_meta_client = self.inner.clone();
1846        let _ = thread::spawn(move || {
1847            let rt = tokio::runtime::Builder::new_current_thread()
1848                .enable_all()
1849                .build()
1850                .unwrap();
1851            let req = AddEventLogRequest {
1852                event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1853            };
1854            rt.block_on(async {
1855                let _ = tokio::time::timeout(
1856                    Duration::from_millis(timeout_millis.unwrap_or(1000)),
1857                    grpc_meta_client.add_event_log(req),
1858                )
1859                .await;
1860            });
1861        })
1862        .join();
1863    }
1864
1865    pub async fn add_sink_fail_evet(
1866        &self,
1867        sink_id: SinkId,
1868        sink_name: String,
1869        connector: String,
1870        error: String,
1871    ) -> Result<()> {
1872        let event = event_log::EventSinkFail {
1873            sink_id,
1874            sink_name,
1875            connector,
1876            error,
1877        };
1878        let req = AddEventLogRequest {
1879            event: Some(add_event_log_request::Event::SinkFail(event)),
1880        };
1881        self.inner.add_event_log(req).await?;
1882        Ok(())
1883    }
1884
1885    pub async fn add_cdc_auto_schema_change_fail_event(
1886        &self,
1887        source_id: SourceId,
1888        table_name: String,
1889        cdc_table_id: String,
1890        upstream_ddl: String,
1891        fail_info: String,
1892    ) -> Result<()> {
1893        let event = event_log::EventAutoSchemaChangeFail {
1894            table_id: source_id.as_cdc_table_id(),
1895            table_name,
1896            cdc_table_id,
1897            upstream_ddl,
1898            fail_info,
1899        };
1900        let req = AddEventLogRequest {
1901            event: Some(add_event_log_request::Event::AutoSchemaChangeFail(event)),
1902        };
1903        self.inner.add_event_log(req).await?;
1904        Ok(())
1905    }
1906
1907    pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1908        let req = CancelCompactTaskRequest {
1909            task_id,
1910            task_status: task_status as _,
1911        };
1912        let resp = self.inner.cancel_compact_task(req).await?;
1913        Ok(resp.ret)
1914    }
1915
1916    pub async fn get_version_by_epoch(
1917        &self,
1918        epoch: HummockEpoch,
1919        table_id: TableId,
1920    ) -> Result<PbHummockVersion> {
1921        let req = GetVersionByEpochRequest { epoch, table_id };
1922        let resp = self.inner.get_version_by_epoch(req).await?;
1923        Ok(resp.version.unwrap())
1924    }
1925
1926    pub async fn get_cluster_limits(
1927        &self,
1928    ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1929        let req = GetClusterLimitsRequest {};
1930        let resp = self.inner.get_cluster_limits(req).await?;
1931        Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1932    }
1933
1934    pub async fn merge_compaction_group(
1935        &self,
1936        left_group_id: CompactionGroupId,
1937        right_group_id: CompactionGroupId,
1938    ) -> Result<()> {
1939        let req = MergeCompactionGroupRequest {
1940            left_group_id,
1941            right_group_id,
1942        };
1943        self.inner.merge_compaction_group(req).await?;
1944        Ok(())
1945    }
1946
1947    /// List all rate limits for sources and backfills
1948    pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1949        let request = ListRateLimitsRequest {};
1950        let resp = self.inner.list_rate_limits(request).await?;
1951        Ok(resp.rate_limits)
1952    }
1953
1954    pub async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
1955        let request = ListCdcProgressRequest {};
1956        let resp = self.inner.list_cdc_progress(request).await?;
1957        Ok(resp.cdc_progress)
1958    }
1959
1960    pub async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>> {
1961        let request = ListRefreshTableStatesRequest {};
1962        let resp = self.inner.list_refresh_table_states(request).await?;
1963        Ok(resp.states)
1964    }
1965
1966    pub async fn list_iceberg_compaction_status(
1967        &self,
1968    ) -> Result<Vec<list_iceberg_compaction_status_response::IcebergCompactionStatus>> {
1969        let request = ListIcebergCompactionStatusRequest {};
1970        let resp = self.inner.list_iceberg_compaction_status(request).await?;
1971        Ok(resp.statuses)
1972    }
1973
1974    pub async fn list_sink_log_store_tables(
1975        &self,
1976    ) -> Result<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
1977        let request = ListSinkLogStoreTablesRequest {};
1978        let resp = self.inner.list_sink_log_store_tables(request).await?;
1979        Ok(resp.tables)
1980    }
1981
1982    pub async fn create_iceberg_table(
1983        &self,
1984        table_job_info: PbTableJobInfo,
1985        sink_job_info: PbSinkJobInfo,
1986        iceberg_source: PbSource,
1987        if_not_exists: bool,
1988    ) -> Result<WaitVersion> {
1989        let request = CreateIcebergTableRequest {
1990            table_info: Some(table_job_info),
1991            sink_info: Some(sink_job_info),
1992            iceberg_source: Some(iceberg_source),
1993            if_not_exists,
1994        };
1995
1996        let resp = Box::pin(self.inner.create_iceberg_table(request)).await?;
1997        Ok(resp
1998            .version
1999            .ok_or_else(|| anyhow!("wait version not set"))?)
2000    }
2001
2002    pub async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
2003        let request = ListIcebergTablesRequest {};
2004        let resp = self.inner.list_iceberg_tables(request).await?;
2005        Ok(resp.iceberg_tables)
2006    }
2007
2008    /// Get the await tree of all nodes in the cluster.
2009    pub async fn get_cluster_stack_trace(
2010        &self,
2011        actor_traces_format: ActorTracesFormat,
2012    ) -> Result<StackTraceResponse> {
2013        let request = StackTraceRequest {
2014            actor_traces_format: actor_traces_format as i32,
2015        };
2016        let resp = self.inner.stack_trace(request).await?;
2017        Ok(resp)
2018    }
2019
2020    pub async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
2021        let request = SetSyncLogStoreAlignedRequest { job_id, aligned };
2022        self.inner.set_sync_log_store_aligned(request).await?;
2023        Ok(())
2024    }
2025
2026    pub async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
2027        self.inner.refresh(request).await
2028    }
2029
2030    pub async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
2031        let request = ListUnmigratedTablesRequest {};
2032        let resp = self.inner.list_unmigrated_tables(request).await?;
2033        Ok(resp
2034            .tables
2035            .into_iter()
2036            .map(|table| (table.table_id, table.table_name))
2037            .collect())
2038    }
2039}
2040
2041#[async_trait]
2042impl HummockMetaClient for MetaClient {
2043    async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
2044        let req = UnpinVersionBeforeRequest {
2045            context_id: self.worker_id(),
2046            unpin_version_before,
2047        };
2048        self.inner.unpin_version_before(req).await?;
2049        Ok(())
2050    }
2051
2052    async fn get_current_version(&self) -> Result<HummockVersion> {
2053        let req = GetCurrentVersionRequest::default();
2054        Ok(HummockVersion::from_rpc_protobuf(
2055            &self
2056                .inner
2057                .get_current_version(req)
2058                .await?
2059                .current_version
2060                .unwrap(),
2061        ))
2062    }
2063
2064    async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
2065        let resp = self
2066            .inner
2067            .get_new_object_ids(GetNewObjectIdsRequest { number })
2068            .await?;
2069        Ok(ObjectIdRange::new(resp.start_id, resp.end_id))
2070    }
2071
2072    async fn commit_epoch_with_change_log(
2073        &self,
2074        _epoch: HummockEpoch,
2075        _sync_result: SyncResult,
2076        _change_log_info: Option<HummockMetaClientChangeLogInfo>,
2077    ) -> Result<()> {
2078        panic!("Only meta service can commit_epoch in production.")
2079    }
2080
2081    async fn trigger_manual_compaction(
2082        &self,
2083        compaction_group_id: CompactionGroupId,
2084        table_id: JobId,
2085        level: u32,
2086        sst_ids: Vec<HummockSstableId>,
2087        exclusive: bool,
2088    ) -> Result<bool> {
2089        // TODO: support key_range parameter
2090        let req = TriggerManualCompactionRequest {
2091            compaction_group_id,
2092            table_id,
2093            // if table_id not exist, manual_compaction will include all the sst
2094            // without check internal_table_id
2095            level,
2096            sst_ids,
2097            exclusive: Some(exclusive),
2098            ..Default::default()
2099        };
2100
2101        let resp = self.inner.trigger_manual_compaction(req).await?;
2102        Ok(resp.should_retry.unwrap_or(false))
2103    }
2104
2105    async fn trigger_full_gc(
2106        &self,
2107        sst_retention_time_sec: u64,
2108        prefix: Option<String>,
2109    ) -> Result<()> {
2110        self.inner
2111            .trigger_full_gc(TriggerFullGcRequest {
2112                sst_retention_time_sec,
2113                prefix,
2114            })
2115            .await?;
2116        Ok(())
2117    }
2118
2119    async fn subscribe_compaction_event(
2120        &self,
2121    ) -> Result<(
2122        UnboundedSender<SubscribeCompactionEventRequest>,
2123        BoxStream<'static, CompactionEventItem>,
2124    )> {
2125        let (request_sender, request_receiver) =
2126            unbounded_channel::<SubscribeCompactionEventRequest>();
2127        request_sender
2128            .send(SubscribeCompactionEventRequest {
2129                event: Some(subscribe_compaction_event_request::Event::Register(
2130                    Register {
2131                        context_id: self.worker_id(),
2132                    },
2133                )),
2134                create_at: SystemTime::now()
2135                    .duration_since(SystemTime::UNIX_EPOCH)
2136                    .expect("Clock may have gone backwards")
2137                    .as_millis() as u64,
2138            })
2139            .context("Failed to subscribe compaction event")?;
2140
2141        let stream = self
2142            .inner
2143            .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
2144                request_receiver,
2145            )))
2146            .await?;
2147
2148        Ok((request_sender, Box::pin(stream)))
2149    }
2150
2151    async fn get_version_by_epoch(
2152        &self,
2153        epoch: HummockEpoch,
2154        table_id: TableId,
2155    ) -> Result<PbHummockVersion> {
2156        self.get_version_by_epoch(epoch, table_id).await
2157    }
2158
2159    async fn subscribe_iceberg_compaction_event(
2160        &self,
2161    ) -> Result<(
2162        UnboundedSender<SubscribeIcebergCompactionEventRequest>,
2163        BoxStream<'static, IcebergCompactionEventItem>,
2164    )> {
2165        let (request_sender, request_receiver) =
2166            unbounded_channel::<SubscribeIcebergCompactionEventRequest>();
2167        request_sender
2168            .send(SubscribeIcebergCompactionEventRequest {
2169                event: Some(subscribe_iceberg_compaction_event_request::Event::Register(
2170                    IcebergRegister {
2171                        context_id: self.worker_id(),
2172                    },
2173                )),
2174                create_at: SystemTime::now()
2175                    .duration_since(SystemTime::UNIX_EPOCH)
2176                    .expect("Clock may have gone backwards")
2177                    .as_millis() as u64,
2178            })
2179            .context("Failed to subscribe compaction event")?;
2180
2181        let stream = self
2182            .inner
2183            .subscribe_iceberg_compaction_event(Request::new(UnboundedReceiverStream::new(
2184                request_receiver,
2185            )))
2186            .await?;
2187
2188        Ok((request_sender, Box::pin(stream)))
2189    }
2190
2191    async fn get_table_change_logs(
2192        &self,
2193        epoch_only: bool,
2194        start_epoch_inclusive: Option<u64>,
2195        end_epoch_inclusive: Option<u64>,
2196        table_ids: Option<HashSet<TableId>>,
2197        exclude_empty: bool,
2198        limit: Option<u32>,
2199    ) -> Result<TableChangeLogs> {
2200        self.get_table_change_logs(
2201            epoch_only,
2202            start_epoch_inclusive,
2203            end_epoch_inclusive,
2204            table_ids,
2205            exclude_empty,
2206            limit,
2207        )
2208        .await
2209    }
2210}
2211
2212#[async_trait]
2213impl TelemetryInfoFetcher for MetaClient {
2214    async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
2215        let resp = self
2216            .get_telemetry_info()
2217            .await
2218            .map_err(|e| e.to_report_string())?;
2219        let tracking_id = resp.get_tracking_id().ok();
2220        Ok(tracking_id.map(|id| id.to_owned()))
2221    }
2222}
2223
2224pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
2225
2226#[derive(Debug, Clone)]
2227struct GrpcMetaClientCore {
2228    cluster_client: ClusterServiceClient<Channel>,
2229    meta_member_client: MetaMemberServiceClient<Channel>,
2230    heartbeat_client: HeartbeatServiceClient<Channel>,
2231    ddl_client: DdlServiceClient<Channel>,
2232    hummock_client: HummockManagerServiceClient<Channel>,
2233    notification_client: NotificationServiceClient<Channel>,
2234    stream_client: StreamManagerServiceClient<Channel>,
2235    user_client: UserServiceClient<Channel>,
2236    scale_client: ScaleServiceClient<Channel>,
2237    backup_client: BackupServiceClient<Channel>,
2238    telemetry_client: TelemetryInfoServiceClient<Channel>,
2239    system_params_client: SystemParamsServiceClient<Channel>,
2240    session_params_client: SessionParamServiceClient<Channel>,
2241    serving_client: ServingServiceClient<Channel>,
2242    cloud_client: CloudServiceClient<Channel>,
2243    sink_coordinate_client: SinkCoordinationRpcClient,
2244    event_log_client: EventLogServiceClient<Channel>,
2245    cluster_limit_client: ClusterLimitServiceClient<Channel>,
2246    hosted_iceberg_catalog_service_client: HostedIcebergCatalogServiceClient<Channel>,
2247    monitor_client: MonitorServiceClient<Channel>,
2248}
2249
2250impl GrpcMetaClientCore {
2251    pub(crate) fn new(channel: Channel) -> Self {
2252        let cluster_client = ClusterServiceClient::new(channel.clone());
2253        let meta_member_client = MetaMemberClient::new(channel.clone());
2254        let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
2255        let ddl_client =
2256            DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2257        let hummock_client =
2258            HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2259        let notification_client =
2260            NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2261        let stream_client =
2262            StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2263        let user_client = UserServiceClient::new(channel.clone());
2264        let scale_client =
2265            ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2266        let backup_client = BackupServiceClient::new(channel.clone());
2267        let telemetry_client =
2268            TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2269        let system_params_client = SystemParamsServiceClient::new(channel.clone());
2270        let session_params_client = SessionParamServiceClient::new(channel.clone());
2271        let serving_client = ServingServiceClient::new(channel.clone());
2272        let cloud_client = CloudServiceClient::new(channel.clone());
2273        let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone())
2274            .max_decoding_message_size(usize::MAX);
2275        let event_log_client = EventLogServiceClient::new(channel.clone());
2276        let cluster_limit_client = ClusterLimitServiceClient::new(channel.clone());
2277        let hosted_iceberg_catalog_service_client =
2278            HostedIcebergCatalogServiceClient::new(channel.clone());
2279        let monitor_client = configured_monitor_service_client(MonitorServiceClient::new(channel));
2280
2281        GrpcMetaClientCore {
2282            cluster_client,
2283            meta_member_client,
2284            heartbeat_client,
2285            ddl_client,
2286            hummock_client,
2287            notification_client,
2288            stream_client,
2289            user_client,
2290            scale_client,
2291            backup_client,
2292            telemetry_client,
2293            system_params_client,
2294            session_params_client,
2295            serving_client,
2296            cloud_client,
2297            sink_coordinate_client,
2298            event_log_client,
2299            cluster_limit_client,
2300            hosted_iceberg_catalog_service_client,
2301            monitor_client,
2302        }
2303    }
2304}
2305
2306/// Client to meta server. Cloning the instance is lightweight.
2307///
2308/// It is a wrapper of tonic client. See [`crate::meta_rpc_client_method_impl`].
2309#[derive(Debug, Clone)]
2310struct GrpcMetaClient {
2311    member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
2312    core: Arc<RwLock<GrpcMetaClientCore>>,
2313}
2314
2315type MetaMemberClient = MetaMemberServiceClient<Channel>;
2316
2317struct MetaMemberGroup {
2318    members: LruCache<http::Uri, Option<MetaMemberClient>>,
2319}
2320
2321struct MetaMemberManagement {
2322    core_ref: Arc<RwLock<GrpcMetaClientCore>>,
2323    members: Either<MetaMemberClient, MetaMemberGroup>,
2324    current_leader: http::Uri,
2325    meta_config: Arc<MetaConfig>,
2326}
2327
2328impl MetaMemberManagement {
2329    const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
2330
2331    fn host_address_to_uri(addr: HostAddress) -> http::Uri {
2332        format!("http://{}:{}", addr.host, addr.port)
2333            .parse()
2334            .unwrap()
2335    }
2336
2337    async fn recreate_core(&self, channel: Channel) {
2338        let mut core = self.core_ref.write().await;
2339        *core = GrpcMetaClientCore::new(channel);
2340    }
2341
2342    async fn refresh_members(&mut self) -> Result<()> {
2343        let leader_addr = match self.members.as_mut() {
2344            Either::Left(client) => {
2345                let resp = client
2346                    .to_owned()
2347                    .members(MembersRequest {})
2348                    .await
2349                    .map_err(RpcError::from_meta_status)?;
2350                let resp = resp.into_inner();
2351                resp.members.into_iter().find(|member| member.is_leader)
2352            }
2353            Either::Right(member_group) => {
2354                let mut fetched_members = None;
2355
2356                for (addr, client) in &mut member_group.members {
2357                    let members: Result<_> = try {
2358                        let mut client = match client {
2359                            Some(cached_client) => cached_client.to_owned(),
2360                            None => {
2361                                let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
2362                                let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
2363                                    .await
2364                                    .context("failed to create client")?;
2365                                let new_client: MetaMemberClient =
2366                                    MetaMemberServiceClient::new(channel);
2367                                *client = Some(new_client.clone());
2368
2369                                new_client
2370                            }
2371                        };
2372
2373                        let resp = client
2374                            .members(MembersRequest {})
2375                            .await
2376                            .context("failed to fetch members")?;
2377
2378                        resp.into_inner().members
2379                    };
2380
2381                    let fetched = members.is_ok();
2382                    fetched_members = Some(members);
2383                    if fetched {
2384                        break;
2385                    }
2386                }
2387
2388                let members = fetched_members
2389                    .context("no member available in the list")?
2390                    .context("could not refresh members")?;
2391
2392                // find new leader
2393                let mut leader = None;
2394                for member in members {
2395                    if member.is_leader {
2396                        leader = Some(member.clone());
2397                    }
2398
2399                    let addr = Self::host_address_to_uri(member.address.unwrap());
2400                    // We don't clean any expired addrs here to deal with some extreme situations.
2401                    if !member_group.members.contains(&addr) {
2402                        tracing::info!("new meta member joined: {}", addr);
2403                        member_group.members.put(addr, None);
2404                    }
2405                }
2406
2407                leader
2408            }
2409        };
2410
2411        if let Some(leader) = leader_addr {
2412            let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
2413
2414            if discovered_leader != self.current_leader {
2415                tracing::info!("new meta leader {} discovered", discovered_leader);
2416
2417                let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
2418                    Duration::from_secs(self.meta_config.meta_leader_lease_secs),
2419                    false,
2420                );
2421
2422                let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
2423                    let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
2424                    GrpcMetaClient::connect_to_endpoint(endpoint).await
2425                })
2426                .await?;
2427
2428                self.recreate_core(channel).await;
2429                self.current_leader = discovered_leader;
2430            }
2431        }
2432
2433        Ok(())
2434    }
2435}
2436
2437impl GrpcMetaClient {
2438    // See `Endpoint::http2_keep_alive_interval`
2439    const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
2440    // See `Endpoint::keep_alive_timeout`
2441    const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
2442    // Retry base interval in ms for connecting to meta server.
2443    const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
2444    // Max retry times for connecting to meta server.
2445    const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
2446
2447    fn start_meta_member_monitor(
2448        &self,
2449        init_leader_addr: http::Uri,
2450        members: Either<MetaMemberClient, MetaMemberGroup>,
2451        force_refresh_receiver: Receiver<Sender<Result<()>>>,
2452        meta_config: Arc<MetaConfig>,
2453    ) -> Result<()> {
2454        let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
2455        let current_leader = init_leader_addr;
2456
2457        let enable_period_tick = matches!(members, Either::Right(_));
2458
2459        let member_management = MetaMemberManagement {
2460            core_ref,
2461            members,
2462            current_leader,
2463            meta_config,
2464        };
2465
2466        let mut force_refresh_receiver = force_refresh_receiver;
2467
2468        tokio::spawn(async move {
2469            let mut member_management = member_management;
2470            let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
2471
2472            loop {
2473                let event: Option<Sender<Result<()>>> = if enable_period_tick {
2474                    tokio::select! {
2475                        _ = ticker.tick() => None,
2476                        result_sender = force_refresh_receiver.recv() => {
2477                            if result_sender.is_none() {
2478                                break;
2479                            }
2480
2481                            result_sender
2482                        },
2483                    }
2484                } else {
2485                    let result_sender = force_refresh_receiver.recv().await;
2486
2487                    if result_sender.is_none() {
2488                        break;
2489                    }
2490
2491                    result_sender
2492                };
2493
2494                let tick_result = member_management.refresh_members().await;
2495                if let Err(e) = tick_result.as_ref() {
2496                    tracing::warn!(error = %e.as_report(),  "refresh meta member client failed");
2497                }
2498
2499                if let Some(sender) = event {
2500                    // ignore resp
2501                    let _resp = sender.send(tick_result);
2502                }
2503            }
2504        });
2505
2506        Ok(())
2507    }
2508
2509    async fn force_refresh_leader(&self) -> Result<()> {
2510        let (sender, receiver) = oneshot::channel();
2511
2512        self.member_monitor_event_sender
2513            .send(sender)
2514            .await
2515            .map_err(|e| anyhow!(e))?;
2516
2517        receiver.await.map_err(|e| anyhow!(e))?
2518    }
2519
2520    /// Connect to the meta server from `addrs`.
2521    pub async fn new(strategy: &MetaAddressStrategy, config: Arc<MetaConfig>) -> Result<Self> {
2522        let (channel, addr) = match strategy {
2523            MetaAddressStrategy::LoadBalance(addr) => {
2524                Self::try_build_rpc_channel(vec![addr.clone()]).await
2525            }
2526            MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
2527        }?;
2528        let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
2529        let client = GrpcMetaClient {
2530            member_monitor_event_sender: force_refresh_sender,
2531            core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
2532        };
2533
2534        let meta_member_client = client.core.read().await.meta_member_client.clone();
2535        let members = match strategy {
2536            MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
2537            MetaAddressStrategy::List(addrs) => {
2538                let mut members = LruCache::new(20.try_into().unwrap());
2539                for addr in addrs {
2540                    members.put(addr.clone(), None);
2541                }
2542                members.put(addr.clone(), Some(meta_member_client));
2543
2544                Either::Right(MetaMemberGroup { members })
2545            }
2546        };
2547
2548        client.start_meta_member_monitor(addr, members, force_refresh_receiver, config.clone())?;
2549
2550        client.force_refresh_leader().await?;
2551
2552        Ok(client)
2553    }
2554
2555    fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2556        Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2557    }
2558
2559    pub(crate) async fn try_build_rpc_channel(
2560        addrs: impl IntoIterator<Item = http::Uri>,
2561    ) -> Result<(Channel, http::Uri)> {
2562        let endpoints: Vec<_> = addrs
2563            .into_iter()
2564            .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2565            .collect();
2566
2567        let mut last_error = None;
2568
2569        for (endpoint, addr) in endpoints {
2570            match Self::connect_to_endpoint(endpoint).await {
2571                Ok(channel) => {
2572                    tracing::info!("Connect to meta server {} successfully", addr);
2573                    return Ok((channel, addr));
2574                }
2575                Err(e) => {
2576                    tracing::warn!(
2577                        error = %e.as_report(),
2578                        "Failed to connect to meta server {}, trying again",
2579                        addr,
2580                    );
2581                    last_error = Some(e);
2582                }
2583            }
2584        }
2585
2586        if let Some(last_error) = last_error {
2587            Err(anyhow::anyhow!(last_error)
2588                .context("failed to connect to all meta servers")
2589                .into())
2590        } else {
2591            bail!("no meta server address provided")
2592        }
2593    }
2594
2595    async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2596        let channel = endpoint
2597            .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2598            .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2599            .connect_timeout(Duration::from_secs(5))
2600            .monitored_connect("grpc-meta-client", Default::default())
2601            .await?
2602            .wrapped();
2603
2604        Ok(channel)
2605    }
2606
2607    pub(crate) fn retry_strategy_to_bound(
2608        high_bound: Duration,
2609        exceed: bool,
2610    ) -> impl Iterator<Item = Duration> {
2611        let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2612            .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2613            .map(jitter);
2614
2615        let mut sum = Duration::default();
2616
2617        iter.take_while(move |duration| {
2618            sum += *duration;
2619
2620            if exceed {
2621                sum < high_bound + *duration
2622            } else {
2623                sum < high_bound
2624            }
2625        })
2626    }
2627}
2628
2629macro_rules! for_all_meta_rpc {
2630    ($macro:ident) => {
2631        $macro! {
2632             { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2633            ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2634            ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2635            ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2636            ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2637            ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2638            ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2639            ,{ stream_client, flush, FlushRequest, FlushResponse }
2640            ,{ stream_client, pause, PauseRequest, PauseResponse }
2641            ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2642            ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2643            ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2644            ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2645            ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2646            ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2647            ,{ stream_client, list_creating_fragment_distribution, ListCreatingFragmentDistributionRequest, ListCreatingFragmentDistributionResponse }
2648            ,{ stream_client, list_sink_log_store_tables, ListSinkLogStoreTablesRequest, ListSinkLogStoreTablesResponse }
2649            ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2650            ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2651            ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2652            ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2653            ,{ stream_client, list_cdc_progress, ListCdcProgressRequest, ListCdcProgressResponse }
2654            ,{ stream_client, list_refresh_table_states, ListRefreshTableStatesRequest, ListRefreshTableStatesResponse }
2655            ,{ stream_client, list_iceberg_compaction_status, ListIcebergCompactionStatusRequest, ListIcebergCompactionStatusResponse }
2656            ,{ stream_client, alter_connector_props, AlterConnectorPropsRequest, AlterConnectorPropsResponse }
2657            ,{ stream_client, alter_source_properties_safe, AlterSourcePropertiesSafeRequest, AlterSourcePropertiesSafeResponse }
2658            ,{ stream_client, reset_source_splits, ResetSourceSplitsRequest, ResetSourceSplitsResponse }
2659            ,{ stream_client, inject_source_offsets, InjectSourceOffsetsRequest, InjectSourceOffsetsResponse }
2660            ,{ stream_client, get_fragment_by_id, GetFragmentByIdRequest, GetFragmentByIdResponse }
2661            ,{ stream_client, get_fragment_vnodes, GetFragmentVnodesRequest, GetFragmentVnodesResponse }
2662            ,{ stream_client, get_actor_vnodes, GetActorVnodesRequest, GetActorVnodesResponse }
2663            ,{ stream_client, set_sync_log_store_aligned, SetSyncLogStoreAlignedRequest, SetSyncLogStoreAlignedResponse }
2664            ,{ stream_client, refresh, RefreshRequest, RefreshResponse }
2665            ,{ stream_client, list_unmigrated_tables, ListUnmigratedTablesRequest, ListUnmigratedTablesResponse }
2666            ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2667            ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2668            ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2669            ,{ ddl_client, alter_subscription_retention, AlterSubscriptionRetentionRequest, AlterSubscriptionRetentionResponse }
2670            ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2671            ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2672            ,{ ddl_client, alter_backfill_parallelism, AlterBackfillParallelismRequest, AlterBackfillParallelismResponse }
2673            ,{ ddl_client, alter_streaming_job_config, AlterStreamingJobConfigRequest, AlterStreamingJobConfigResponse }
2674            ,{ ddl_client, alter_fragment_parallelism, AlterFragmentParallelismRequest, AlterFragmentParallelismResponse }
2675            ,{ ddl_client, alter_cdc_table_backfill_parallelism, AlterCdcTableBackfillParallelismRequest, AlterCdcTableBackfillParallelismResponse }
2676            ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2677            ,{ ddl_client, alter_database_param, AlterDatabaseParamRequest, AlterDatabaseParamResponse }
2678            ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2679            ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2680            ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2681            ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2682            ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2683            ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2684            ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2685            ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2686            ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2687            ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2688            ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2689            ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2690            ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2691            ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2692            ,{ ddl_client, reset_source, ResetSourceRequest, ResetSourceResponse }
2693            ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2694            ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2695            ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2696            ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2697            ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2698            ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2699            ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2700            ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2701            ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2702            ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2703            ,{ ddl_client, risectl_resume_backfill, RisectlResumeBackfillRequest, RisectlResumeBackfillResponse }
2704            ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2705            ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2706            ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2707            ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2708            ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2709            ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2710            ,{ ddl_client, wait, WaitRequest, WaitResponse }
2711            ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2712            ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2713            ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2714            ,{ ddl_client, compact_iceberg_table, CompactIcebergTableRequest, CompactIcebergTableResponse }
2715            ,{ ddl_client, expire_iceberg_table_snapshots, ExpireIcebergTableSnapshotsRequest, ExpireIcebergTableSnapshotsResponse }
2716            ,{ ddl_client, create_iceberg_table, CreateIcebergTableRequest, CreateIcebergTableResponse }
2717            ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2718            ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2719            ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2720            ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2721            ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2722            ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2723            ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2724            ,{ hummock_client, get_new_object_ids, GetNewObjectIdsRequest, GetNewObjectIdsResponse }
2725            ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2726            ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2727            ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2728            ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2729            ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2730            ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2731            ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2732            ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2733            ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2734            ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2735            ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2736            ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2737            ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2738            ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2739            ,{ hummock_client, subscribe_iceberg_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeIcebergCompactionEventRequest>, Streaming<SubscribeIcebergCompactionEventResponse> }
2740            ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2741            ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2742            ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2743            ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2744            ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2745            ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2746            ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2747            ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2748            ,{ hummock_client, get_table_change_logs, GetTableChangeLogsRequest, GetTableChangeLogsResponse }
2749            ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2750            ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2751            ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2752            ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2753            ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2754            ,{ user_client, alter_default_privilege, AlterDefaultPrivilegeRequest, AlterDefaultPrivilegeResponse }
2755            ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2756            ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2757            ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2758            ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2759            ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2760            ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2761            ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2762            ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2763            ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2764            ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2765            ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2766            ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2767            ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2768            ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2769            ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2770            ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2771            ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2772            ,{ hosted_iceberg_catalog_service_client, list_iceberg_tables, ListIcebergTablesRequest, ListIcebergTablesResponse }
2773            ,{ monitor_client, stack_trace, StackTraceRequest, StackTraceResponse }
2774        }
2775    };
2776}
2777
2778impl GrpcMetaClient {
2779    async fn refresh_client_if_needed(&self, code: Code) {
2780        if matches!(
2781            code,
2782            Code::Unknown | Code::Unimplemented | Code::Unavailable
2783        ) {
2784            tracing::debug!("matching tonic code {}", code);
2785            let (result_sender, result_receiver) = oneshot::channel();
2786            if self
2787                .member_monitor_event_sender
2788                .try_send(result_sender)
2789                .is_ok()
2790            {
2791                if let Ok(Err(e)) = result_receiver.await {
2792                    tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2793                }
2794            } else {
2795                tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2796            }
2797        }
2798    }
2799}
2800
2801impl GrpcMetaClient {
2802    for_all_meta_rpc! { meta_rpc_client_method_impl }
2803}