1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{
33 AlterDatabaseParam, FunctionId, IndexId, ObjectId, SecretId, TableId,
34};
35use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
36use risingwave_common::hash::WorkerSlotMapping;
37use risingwave_common::id::{
38 ConnectionId, DatabaseId, JobId, SchemaId, SinkId, SubscriptionId, ViewId, WorkerId,
39};
40use risingwave_common::monitor::EndpointExt;
41use risingwave_common::system_param::reader::SystemParamsReader;
42use risingwave_common::telemetry::report::TelemetryInfoFetcher;
43use risingwave_common::util::addr::HostAddr;
44use risingwave_common::util::meta_addr::MetaAddressStrategy;
45use risingwave_common::util::resource_util::cpu::total_cpu_available;
46use risingwave_common::util::resource_util::hostname;
47use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
48use risingwave_error::bail;
49use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
50use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
51use risingwave_hummock_sdk::{
52 CompactionGroupId, HummockEpoch, HummockVersionId, ObjectIdRange, SyncResult,
53};
54use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
55use risingwave_pb::backup_service::*;
56use risingwave_pb::catalog::{
57 Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
58 PbSubscription, PbTable, PbView, Table,
59};
60use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
61use risingwave_pb::cloud_service::*;
62use risingwave_pb::common::worker_node::Property;
63use risingwave_pb::common::{HostAddress, OptionalUint32, OptionalUint64, WorkerNode, WorkerType};
64use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
65use risingwave_pb::ddl_service::alter_owner_request::Object;
66use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
67use risingwave_pb::ddl_service::create_materialized_view_request::PbBackfillType;
68use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
69use risingwave_pb::ddl_service::*;
70use risingwave_pb::hummock::compact_task::TaskStatus;
71use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
72use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
73use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
74use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
75use risingwave_pb::hummock::write_limits::WriteLimit;
76use risingwave_pb::hummock::*;
77use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Register as IcebergRegister;
78use risingwave_pb::iceberg_compaction::{
79 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
80 subscribe_iceberg_compaction_event_request,
81};
82use risingwave_pb::id::{ActorId, FragmentId, SourceId};
83use risingwave_pb::meta::alter_connector_props_request::{
84 AlterConnectorPropsObject, AlterIcebergTableIds, ExtraOptions,
85};
86use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
87use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
88use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
89use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
90use risingwave_pb::meta::hosted_iceberg_catalog_service_client::HostedIcebergCatalogServiceClient;
91use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
92use risingwave_pb::meta::list_actor_states_response::ActorState;
93use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
94use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
95use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
96use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
97use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
98use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
99use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
100use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
101use risingwave_pb::meta::serving_service_client::ServingServiceClient;
102use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
103use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
104use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
105use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
106use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
107use risingwave_pb::meta::{FragmentDistribution, *};
108use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
109use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
110use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
111use risingwave_pb::secret::PbSecretRef;
112use risingwave_pb::stream_plan::StreamFragmentGraph;
113use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
114use risingwave_pb::user::update_user_request::UpdateField;
115use risingwave_pb::user::user_service_client::UserServiceClient;
116use risingwave_pb::user::*;
117use thiserror_ext::AsReport;
118use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
119use tokio::sync::oneshot::Sender;
120use tokio::sync::{RwLock, mpsc, oneshot};
121use tokio::task::JoinHandle;
122use tokio::time::{self};
123use tokio_retry::strategy::{ExponentialBackoff, jitter};
124use tokio_stream::wrappers::UnboundedReceiverStream;
125use tonic::transport::Endpoint;
126use tonic::{Code, Request, Streaming};
127
128use crate::channel::{Channel, WrappedChannelExt};
129use crate::error::{Result, RpcError};
130use crate::hummock_meta_client::{
131 CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
132 IcebergCompactionEventItem,
133};
134use crate::meta_rpc_client_method_impl;
135
136#[derive(Clone, Debug)]
138pub struct MetaClient {
139 worker_id: WorkerId,
140 worker_type: WorkerType,
141 host_addr: HostAddr,
142 inner: GrpcMetaClient,
143 meta_config: Arc<MetaConfig>,
144 cluster_id: String,
145 shutting_down: Arc<AtomicBool>,
146}
147
148impl MetaClient {
149 pub fn worker_id(&self) -> WorkerId {
150 self.worker_id
151 }
152
153 pub fn host_addr(&self) -> &HostAddr {
154 &self.host_addr
155 }
156
157 pub fn worker_type(&self) -> WorkerType {
158 self.worker_type
159 }
160
161 pub fn cluster_id(&self) -> &str {
162 &self.cluster_id
163 }
164
165 pub async fn subscribe(
167 &self,
168 subscribe_type: SubscribeType,
169 ) -> Result<Streaming<SubscribeResponse>> {
170 let request = SubscribeRequest {
171 subscribe_type: subscribe_type as i32,
172 host: Some(self.host_addr.to_protobuf()),
173 worker_id: self.worker_id(),
174 };
175
176 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
177 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
178 true,
179 );
180
181 tokio_retry::Retry::spawn(retry_strategy, || async {
182 let request = request.clone();
183 self.inner.subscribe(request).await
184 })
185 .await
186 }
187
188 pub async fn create_connection(
189 &self,
190 connection_name: String,
191 database_id: DatabaseId,
192 schema_id: SchemaId,
193 owner_id: u32,
194 req: create_connection_request::Payload,
195 ) -> Result<WaitVersion> {
196 let request = CreateConnectionRequest {
197 name: connection_name,
198 database_id,
199 schema_id,
200 owner_id,
201 payload: Some(req),
202 };
203 let resp = self.inner.create_connection(request).await?;
204 Ok(resp
205 .version
206 .ok_or_else(|| anyhow!("wait version not set"))?)
207 }
208
209 pub async fn create_secret(
210 &self,
211 secret_name: String,
212 database_id: DatabaseId,
213 schema_id: SchemaId,
214 owner_id: u32,
215 value: Vec<u8>,
216 ) -> Result<WaitVersion> {
217 let request = CreateSecretRequest {
218 name: secret_name,
219 database_id,
220 schema_id,
221 owner_id,
222 value,
223 };
224 let resp = self.inner.create_secret(request).await?;
225 Ok(resp
226 .version
227 .ok_or_else(|| anyhow!("wait version not set"))?)
228 }
229
230 pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
231 let request = ListConnectionsRequest {};
232 let resp = self.inner.list_connections(request).await?;
233 Ok(resp.connections)
234 }
235
236 pub async fn drop_connection(
237 &self,
238 connection_id: ConnectionId,
239 cascade: bool,
240 ) -> Result<WaitVersion> {
241 let request = DropConnectionRequest {
242 connection_id,
243 cascade,
244 };
245 let resp = self.inner.drop_connection(request).await?;
246 Ok(resp
247 .version
248 .ok_or_else(|| anyhow!("wait version not set"))?)
249 }
250
251 pub async fn drop_secret(&self, secret_id: SecretId) -> Result<WaitVersion> {
252 let request = DropSecretRequest { secret_id };
253 let resp = self.inner.drop_secret(request).await?;
254 Ok(resp
255 .version
256 .ok_or_else(|| anyhow!("wait version not set"))?)
257 }
258
259 pub async fn register_new(
263 addr_strategy: MetaAddressStrategy,
264 worker_type: WorkerType,
265 addr: &HostAddr,
266 property: Property,
267 meta_config: Arc<MetaConfig>,
268 ) -> (Self, SystemParamsReader) {
269 let ret =
270 Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
271
272 match ret {
273 Ok(ret) => ret,
274 Err(err) => {
275 tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
276 std::process::exit(1);
277 }
278 }
279 }
280
281 async fn register_new_inner(
282 addr_strategy: MetaAddressStrategy,
283 worker_type: WorkerType,
284 addr: &HostAddr,
285 property: Property,
286 meta_config: Arc<MetaConfig>,
287 ) -> Result<(Self, SystemParamsReader)> {
288 tracing::info!("register meta client using strategy: {}", addr_strategy);
289
290 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
292 Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
293 true,
294 );
295
296 if property.is_unschedulable {
297 tracing::warn!("worker {:?} registered as unschedulable", addr.clone());
298 }
299 let init_result: Result<_> = tokio_retry::RetryIf::spawn(
300 retry_strategy,
301 || async {
302 let grpc_meta_client =
303 GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
304
305 let add_worker_resp = grpc_meta_client
306 .add_worker_node(AddWorkerNodeRequest {
307 worker_type: worker_type as i32,
308 host: Some(addr.to_protobuf()),
309 property: Some(property.clone()),
310 resource: Some(risingwave_pb::common::worker_node::Resource {
311 rw_version: RW_VERSION.to_owned(),
312 total_memory_bytes: system_memory_available_bytes() as _,
313 total_cpu_cores: total_cpu_available() as _,
314 hostname: hostname(),
315 }),
316 })
317 .await
318 .context("failed to add worker node")?;
319
320 let system_params_resp = grpc_meta_client
321 .get_system_params(GetSystemParamsRequest {})
322 .await
323 .context("failed to get initial system params")?;
324
325 Ok((add_worker_resp, system_params_resp, grpc_meta_client))
326 },
327 |e: &RpcError| !e.is_from_tonic_server_impl(),
330 )
331 .await;
332
333 let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
334 let worker_id = add_worker_resp
335 .node_id
336 .expect("AddWorkerNodeResponse::node_id is empty");
337
338 let meta_client = Self {
339 worker_id,
340 worker_type,
341 host_addr: addr.clone(),
342 inner: grpc_meta_client,
343 meta_config: meta_config.clone(),
344 cluster_id: add_worker_resp.cluster_id,
345 shutting_down: Arc::new(false.into()),
346 };
347
348 static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
349 REPORT_PANIC.call_once(|| {
350 let meta_client_clone = meta_client.clone();
351 std::panic::update_hook(move |default_hook, info| {
352 meta_client_clone.try_add_panic_event_blocking(info, None);
354 default_hook(info);
355 });
356 });
357
358 Ok((meta_client, system_params_resp.params.unwrap().into()))
359 }
360
361 pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
363 let request = ActivateWorkerNodeRequest {
364 host: Some(addr.to_protobuf()),
365 node_id: self.worker_id,
366 };
367 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
368 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
369 true,
370 );
371 tokio_retry::Retry::spawn(retry_strategy, || async {
372 let request = request.clone();
373 self.inner.activate_worker_node(request).await
374 })
375 .await?;
376
377 Ok(())
378 }
379
380 pub async fn send_heartbeat(&self, node_id: WorkerId) -> Result<()> {
382 let request = HeartbeatRequest { node_id };
383 let resp = self.inner.heartbeat(request).await?;
384 if let Some(status) = resp.status
385 && status.code() == risingwave_pb::common::status::Code::UnknownWorker
386 {
387 if !self.shutting_down.load(Relaxed) {
390 tracing::error!(message = status.message, "worker expired");
391 std::process::exit(1);
392 }
393 }
394 Ok(())
395 }
396
397 pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
398 let request = CreateDatabaseRequest { db: Some(db) };
399 let resp = self.inner.create_database(request).await?;
400 Ok(resp
402 .version
403 .ok_or_else(|| anyhow!("wait version not set"))?)
404 }
405
406 pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
407 let request = CreateSchemaRequest {
408 schema: Some(schema),
409 };
410 let resp = self.inner.create_schema(request).await?;
411 Ok(resp
413 .version
414 .ok_or_else(|| anyhow!("wait version not set"))?)
415 }
416
417 pub async fn create_materialized_view(
418 &self,
419 table: PbTable,
420 graph: StreamFragmentGraph,
421 dependencies: HashSet<ObjectId>,
422 specific_resource_group: Option<String>,
423 if_not_exists: bool,
424 ) -> Result<WaitVersion> {
425 let request = CreateMaterializedViewRequest {
426 materialized_view: Some(table),
427 fragment_graph: Some(graph),
428 backfill: PbBackfillType::Regular as _,
429 dependencies: dependencies.into_iter().collect(),
430 specific_resource_group,
431 if_not_exists,
432 };
433 let resp = self.inner.create_materialized_view(request).await?;
434 Ok(resp
436 .version
437 .ok_or_else(|| anyhow!("wait version not set"))?)
438 }
439
440 pub async fn drop_materialized_view(
441 &self,
442 table_id: TableId,
443 cascade: bool,
444 ) -> Result<WaitVersion> {
445 let request = DropMaterializedViewRequest { table_id, cascade };
446
447 let resp = self.inner.drop_materialized_view(request).await?;
448 Ok(resp
449 .version
450 .ok_or_else(|| anyhow!("wait version not set"))?)
451 }
452
453 pub async fn create_source(
454 &self,
455 source: PbSource,
456 graph: Option<StreamFragmentGraph>,
457 if_not_exists: bool,
458 ) -> Result<WaitVersion> {
459 let request = CreateSourceRequest {
460 source: Some(source),
461 fragment_graph: graph,
462 if_not_exists,
463 };
464
465 let resp = self.inner.create_source(request).await?;
466 Ok(resp
467 .version
468 .ok_or_else(|| anyhow!("wait version not set"))?)
469 }
470
471 pub async fn create_sink(
472 &self,
473 sink: PbSink,
474 graph: StreamFragmentGraph,
475 dependencies: HashSet<ObjectId>,
476 if_not_exists: bool,
477 ) -> Result<WaitVersion> {
478 let request = CreateSinkRequest {
479 sink: Some(sink),
480 fragment_graph: Some(graph),
481 dependencies: dependencies.into_iter().collect(),
482 if_not_exists,
483 };
484
485 let resp = self.inner.create_sink(request).await?;
486 Ok(resp
487 .version
488 .ok_or_else(|| anyhow!("wait version not set"))?)
489 }
490
491 pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
492 let request = CreateSubscriptionRequest {
493 subscription: Some(subscription),
494 };
495
496 let resp = self.inner.create_subscription(request).await?;
497 Ok(resp
498 .version
499 .ok_or_else(|| anyhow!("wait version not set"))?)
500 }
501
502 pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
503 let request = CreateFunctionRequest {
504 function: Some(function),
505 };
506 let resp = self.inner.create_function(request).await?;
507 Ok(resp
508 .version
509 .ok_or_else(|| anyhow!("wait version not set"))?)
510 }
511
512 pub async fn create_table(
513 &self,
514 source: Option<PbSource>,
515 table: PbTable,
516 graph: StreamFragmentGraph,
517 job_type: PbTableJobType,
518 if_not_exists: bool,
519 dependencies: HashSet<ObjectId>,
520 ) -> Result<WaitVersion> {
521 let request = CreateTableRequest {
522 materialized_view: Some(table),
523 fragment_graph: Some(graph),
524 source,
525 job_type: job_type as _,
526 if_not_exists,
527 dependencies: dependencies.into_iter().collect(),
528 };
529 let resp = self.inner.create_table(request).await?;
530 Ok(resp
532 .version
533 .ok_or_else(|| anyhow!("wait version not set"))?)
534 }
535
536 pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
537 let request = CommentOnRequest {
538 comment: Some(comment),
539 };
540 let resp = self.inner.comment_on(request).await?;
541 Ok(resp
542 .version
543 .ok_or_else(|| anyhow!("wait version not set"))?)
544 }
545
546 pub async fn alter_name(
547 &self,
548 object: alter_name_request::Object,
549 name: &str,
550 ) -> Result<WaitVersion> {
551 let request = AlterNameRequest {
552 object: Some(object),
553 new_name: name.to_owned(),
554 };
555 let resp = self.inner.alter_name(request).await?;
556 Ok(resp
557 .version
558 .ok_or_else(|| anyhow!("wait version not set"))?)
559 }
560
561 pub async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<WaitVersion> {
562 let request = AlterOwnerRequest {
563 object: Some(object),
564 owner_id,
565 };
566 let resp = self.inner.alter_owner(request).await?;
567 Ok(resp
568 .version
569 .ok_or_else(|| anyhow!("wait version not set"))?)
570 }
571
572 pub async fn alter_database_param(
573 &self,
574 database_id: DatabaseId,
575 param: AlterDatabaseParam,
576 ) -> Result<WaitVersion> {
577 let request = match param {
578 AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms) => {
579 let barrier_interval_ms = OptionalUint32 {
580 value: barrier_interval_ms,
581 };
582 AlterDatabaseParamRequest {
583 database_id,
584 param: Some(alter_database_param_request::Param::BarrierIntervalMs(
585 barrier_interval_ms,
586 )),
587 }
588 }
589 AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency) => {
590 let checkpoint_frequency = OptionalUint64 {
591 value: checkpoint_frequency,
592 };
593 AlterDatabaseParamRequest {
594 database_id,
595 param: Some(alter_database_param_request::Param::CheckpointFrequency(
596 checkpoint_frequency,
597 )),
598 }
599 }
600 };
601 let resp = self.inner.alter_database_param(request).await?;
602 Ok(resp
603 .version
604 .ok_or_else(|| anyhow!("wait version not set"))?)
605 }
606
607 pub async fn alter_set_schema(
608 &self,
609 object: alter_set_schema_request::Object,
610 new_schema_id: SchemaId,
611 ) -> Result<WaitVersion> {
612 let request = AlterSetSchemaRequest {
613 new_schema_id,
614 object: Some(object),
615 };
616 let resp = self.inner.alter_set_schema(request).await?;
617 Ok(resp
618 .version
619 .ok_or_else(|| anyhow!("wait version not set"))?)
620 }
621
622 pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
623 let request = AlterSourceRequest {
624 source: Some(source),
625 };
626 let resp = self.inner.alter_source(request).await?;
627 Ok(resp
628 .version
629 .ok_or_else(|| anyhow!("wait version not set"))?)
630 }
631
632 pub async fn alter_parallelism(
633 &self,
634 job_id: JobId,
635 parallelism: PbTableParallelism,
636 deferred: bool,
637 ) -> Result<()> {
638 let request = AlterParallelismRequest {
639 table_id: job_id,
640 parallelism: Some(parallelism),
641 deferred,
642 };
643
644 self.inner.alter_parallelism(request).await?;
645 Ok(())
646 }
647
648 pub async fn alter_fragment_parallelism(
649 &self,
650 fragment_ids: Vec<FragmentId>,
651 parallelism: Option<PbTableParallelism>,
652 ) -> Result<()> {
653 let request = AlterFragmentParallelismRequest {
654 fragment_ids,
655 parallelism,
656 };
657
658 self.inner.alter_fragment_parallelism(request).await?;
659 Ok(())
660 }
661
662 pub async fn alter_cdc_table_backfill_parallelism(
663 &self,
664 table_id: JobId,
665 parallelism: PbTableParallelism,
666 ) -> Result<()> {
667 let request = AlterCdcTableBackfillParallelismRequest {
668 table_id,
669 parallelism: Some(parallelism),
670 };
671 self.inner
672 .alter_cdc_table_backfill_parallelism(request)
673 .await?;
674 Ok(())
675 }
676
677 pub async fn alter_resource_group(
678 &self,
679 table_id: TableId,
680 resource_group: Option<String>,
681 deferred: bool,
682 ) -> Result<()> {
683 let request = AlterResourceGroupRequest {
684 table_id,
685 resource_group,
686 deferred,
687 };
688
689 self.inner.alter_resource_group(request).await?;
690 Ok(())
691 }
692
693 pub async fn alter_swap_rename(
694 &self,
695 object: alter_swap_rename_request::Object,
696 ) -> Result<WaitVersion> {
697 let request = AlterSwapRenameRequest {
698 object: Some(object),
699 };
700 let resp = self.inner.alter_swap_rename(request).await?;
701 Ok(resp
702 .version
703 .ok_or_else(|| anyhow!("wait version not set"))?)
704 }
705
706 pub async fn alter_secret(
707 &self,
708 secret_id: SecretId,
709 secret_name: String,
710 database_id: DatabaseId,
711 schema_id: SchemaId,
712 owner_id: u32,
713 value: Vec<u8>,
714 ) -> Result<WaitVersion> {
715 let request = AlterSecretRequest {
716 secret_id,
717 name: secret_name,
718 database_id,
719 schema_id,
720 owner_id,
721 value,
722 };
723 let resp = self.inner.alter_secret(request).await?;
724 Ok(resp
725 .version
726 .ok_or_else(|| anyhow!("wait version not set"))?)
727 }
728
729 pub async fn replace_job(
730 &self,
731 graph: StreamFragmentGraph,
732 replace_job: ReplaceJob,
733 ) -> Result<WaitVersion> {
734 let request = ReplaceJobPlanRequest {
735 plan: Some(ReplaceJobPlan {
736 fragment_graph: Some(graph),
737 replace_job: Some(replace_job),
738 }),
739 };
740 let resp = self.inner.replace_job_plan(request).await?;
741 Ok(resp
743 .version
744 .ok_or_else(|| anyhow!("wait version not set"))?)
745 }
746
747 pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
748 let request = AutoSchemaChangeRequest {
749 schema_change: Some(schema_change),
750 };
751 let _ = self.inner.auto_schema_change(request).await?;
752 Ok(())
753 }
754
755 pub async fn create_view(
756 &self,
757 view: PbView,
758 dependencies: HashSet<ObjectId>,
759 ) -> Result<WaitVersion> {
760 let request = CreateViewRequest {
761 view: Some(view),
762 dependencies: dependencies.into_iter().collect(),
763 };
764 let resp = self.inner.create_view(request).await?;
765 Ok(resp
767 .version
768 .ok_or_else(|| anyhow!("wait version not set"))?)
769 }
770
771 pub async fn create_index(
772 &self,
773 index: PbIndex,
774 table: PbTable,
775 graph: StreamFragmentGraph,
776 if_not_exists: bool,
777 ) -> Result<WaitVersion> {
778 let request = CreateIndexRequest {
779 index: Some(index),
780 index_table: Some(table),
781 fragment_graph: Some(graph),
782 if_not_exists,
783 };
784 let resp = self.inner.create_index(request).await?;
785 Ok(resp
787 .version
788 .ok_or_else(|| anyhow!("wait version not set"))?)
789 }
790
791 pub async fn drop_table(
792 &self,
793 source_id: Option<u32>,
794 table_id: TableId,
795 cascade: bool,
796 ) -> Result<WaitVersion> {
797 let request = DropTableRequest {
798 source_id: source_id.map(risingwave_pb::ddl_service::drop_table_request::SourceId::Id),
799 table_id,
800 cascade,
801 };
802
803 let resp = self.inner.drop_table(request).await?;
804 Ok(resp
805 .version
806 .ok_or_else(|| anyhow!("wait version not set"))?)
807 }
808
809 pub async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
810 let request = CompactIcebergTableRequest { sink_id };
811 let resp = self.inner.compact_iceberg_table(request).await?;
812 Ok(resp.task_id)
813 }
814
815 pub async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
816 let request = ExpireIcebergTableSnapshotsRequest { sink_id };
817 let _resp = self.inner.expire_iceberg_table_snapshots(request).await?;
818 Ok(())
819 }
820
821 pub async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<WaitVersion> {
822 let request = DropViewRequest { view_id, cascade };
823 let resp = self.inner.drop_view(request).await?;
824 Ok(resp
825 .version
826 .ok_or_else(|| anyhow!("wait version not set"))?)
827 }
828
829 pub async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<WaitVersion> {
830 let request = DropSourceRequest { source_id, cascade };
831 let resp = self.inner.drop_source(request).await?;
832 Ok(resp
833 .version
834 .ok_or_else(|| anyhow!("wait version not set"))?)
835 }
836
837 pub async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<WaitVersion> {
838 let request = DropSinkRequest { sink_id, cascade };
839 let resp = self.inner.drop_sink(request).await?;
840 Ok(resp
841 .version
842 .ok_or_else(|| anyhow!("wait version not set"))?)
843 }
844
845 pub async fn drop_subscription(
846 &self,
847 subscription_id: SubscriptionId,
848 cascade: bool,
849 ) -> Result<WaitVersion> {
850 let request = DropSubscriptionRequest {
851 subscription_id,
852 cascade,
853 };
854 let resp = self.inner.drop_subscription(request).await?;
855 Ok(resp
856 .version
857 .ok_or_else(|| anyhow!("wait version not set"))?)
858 }
859
860 pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
861 let request = DropIndexRequest { index_id, cascade };
862 let resp = self.inner.drop_index(request).await?;
863 Ok(resp
864 .version
865 .ok_or_else(|| anyhow!("wait version not set"))?)
866 }
867
868 pub async fn drop_function(
869 &self,
870 function_id: FunctionId,
871 cascade: bool,
872 ) -> Result<WaitVersion> {
873 let request = DropFunctionRequest {
874 function_id,
875 cascade,
876 };
877 let resp = self.inner.drop_function(request).await?;
878 Ok(resp
879 .version
880 .ok_or_else(|| anyhow!("wait version not set"))?)
881 }
882
883 pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
884 let request = DropDatabaseRequest { database_id };
885 let resp = self.inner.drop_database(request).await?;
886 Ok(resp
887 .version
888 .ok_or_else(|| anyhow!("wait version not set"))?)
889 }
890
891 pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
892 let request = DropSchemaRequest { schema_id, cascade };
893 let resp = self.inner.drop_schema(request).await?;
894 Ok(resp
895 .version
896 .ok_or_else(|| anyhow!("wait version not set"))?)
897 }
898
899 pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
901 let request = CreateUserRequest { user: Some(user) };
902 let resp = self.inner.create_user(request).await?;
903 Ok(resp.version)
904 }
905
906 pub async fn drop_user(&self, user_id: u32) -> Result<u64> {
907 let request = DropUserRequest { user_id };
908 let resp = self.inner.drop_user(request).await?;
909 Ok(resp.version)
910 }
911
912 pub async fn update_user(
913 &self,
914 user: UserInfo,
915 update_fields: Vec<UpdateField>,
916 ) -> Result<u64> {
917 let request = UpdateUserRequest {
918 user: Some(user),
919 update_fields: update_fields
920 .into_iter()
921 .map(|field| field as i32)
922 .collect::<Vec<_>>(),
923 };
924 let resp = self.inner.update_user(request).await?;
925 Ok(resp.version)
926 }
927
928 pub async fn grant_privilege(
929 &self,
930 user_ids: Vec<u32>,
931 privileges: Vec<GrantPrivilege>,
932 with_grant_option: bool,
933 granted_by: u32,
934 ) -> Result<u64> {
935 let request = GrantPrivilegeRequest {
936 user_ids,
937 privileges,
938 with_grant_option,
939 granted_by,
940 };
941 let resp = self.inner.grant_privilege(request).await?;
942 Ok(resp.version)
943 }
944
945 pub async fn revoke_privilege(
946 &self,
947 user_ids: Vec<u32>,
948 privileges: Vec<GrantPrivilege>,
949 granted_by: u32,
950 revoke_by: u32,
951 revoke_grant_option: bool,
952 cascade: bool,
953 ) -> Result<u64> {
954 let request = RevokePrivilegeRequest {
955 user_ids,
956 privileges,
957 granted_by,
958 revoke_by,
959 revoke_grant_option,
960 cascade,
961 };
962 let resp = self.inner.revoke_privilege(request).await?;
963 Ok(resp.version)
964 }
965
966 pub async fn alter_default_privilege(
967 &self,
968 user_ids: Vec<u32>,
969 database_id: DatabaseId,
970 schema_ids: Vec<SchemaId>,
971 operation: AlterDefaultPrivilegeOperation,
972 granted_by: u32,
973 ) -> Result<()> {
974 let request = AlterDefaultPrivilegeRequest {
975 user_ids,
976 database_id,
977 schema_ids,
978 operation: Some(operation),
979 granted_by,
980 };
981 self.inner.alter_default_privilege(request).await?;
982 Ok(())
983 }
984
985 pub async fn unregister(&self) -> Result<()> {
987 let request = DeleteWorkerNodeRequest {
988 host: Some(self.host_addr.to_protobuf()),
989 };
990 self.inner.delete_worker_node(request).await?;
991 self.shutting_down.store(true, Relaxed);
992 Ok(())
993 }
994
995 pub async fn try_unregister(&self) {
997 match self.unregister().await {
998 Ok(_) => {
999 tracing::info!(
1000 worker_id = %self.worker_id(),
1001 "successfully unregistered from meta service",
1002 )
1003 }
1004 Err(e) => {
1005 tracing::warn!(
1006 error = %e.as_report(),
1007 worker_id = %self.worker_id(),
1008 "failed to unregister from meta service",
1009 );
1010 }
1011 }
1012 }
1013
1014 pub async fn update_schedulability(
1015 &self,
1016 worker_ids: &[WorkerId],
1017 schedulability: Schedulability,
1018 ) -> Result<UpdateWorkerNodeSchedulabilityResponse> {
1019 let request = UpdateWorkerNodeSchedulabilityRequest {
1020 worker_ids: worker_ids.to_vec(),
1021 schedulability: schedulability.into(),
1022 };
1023 let resp = self
1024 .inner
1025 .update_worker_node_schedulability(request)
1026 .await?;
1027 Ok(resp)
1028 }
1029
1030 pub async fn list_worker_nodes(
1031 &self,
1032 worker_type: Option<WorkerType>,
1033 ) -> Result<Vec<WorkerNode>> {
1034 let request = ListAllNodesRequest {
1035 worker_type: worker_type.map(Into::into),
1036 include_starting_nodes: true,
1037 };
1038 let resp = self.inner.list_all_nodes(request).await?;
1039 Ok(resp.nodes)
1040 }
1041
1042 pub fn start_heartbeat_loop(
1044 meta_client: MetaClient,
1045 min_interval: Duration,
1046 ) -> (JoinHandle<()>, Sender<()>) {
1047 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1048 let join_handle = tokio::spawn(async move {
1049 let mut min_interval_ticker = tokio::time::interval(min_interval);
1050 loop {
1051 tokio::select! {
1052 biased;
1053 _ = &mut shutdown_rx => {
1055 tracing::info!("Heartbeat loop is stopped");
1056 return;
1057 }
1058 _ = min_interval_ticker.tick() => {},
1060 }
1061 tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
1062 match tokio::time::timeout(
1063 min_interval * 3,
1065 meta_client.send_heartbeat(meta_client.worker_id()),
1066 )
1067 .await
1068 {
1069 Ok(Ok(_)) => {}
1070 Ok(Err(err)) => {
1071 tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
1072 }
1073 Err(_) => {
1074 tracing::warn!("Failed to send_heartbeat: timeout");
1075 }
1076 }
1077 }
1078 });
1079 (join_handle, shutdown_tx)
1080 }
1081
1082 pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
1083 let request = RisectlListStateTablesRequest {};
1084 let resp = self.inner.risectl_list_state_tables(request).await?;
1085 Ok(resp.tables)
1086 }
1087
1088 pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
1089 let request = FlushRequest { database_id };
1090 let resp = self.inner.flush(request).await?;
1091 Ok(HummockVersionId::new(resp.hummock_version_id))
1092 }
1093
1094 pub async fn wait(&self) -> Result<()> {
1095 let request = WaitRequest {};
1096 self.inner.wait(request).await?;
1097 Ok(())
1098 }
1099
1100 pub async fn recover(&self) -> Result<()> {
1101 let request = RecoverRequest {};
1102 self.inner.recover(request).await?;
1103 Ok(())
1104 }
1105
1106 pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
1107 let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
1108 let resp = self.inner.cancel_creating_jobs(request).await?;
1109 Ok(resp.canceled_jobs)
1110 }
1111
1112 pub async fn list_table_fragments(
1113 &self,
1114 job_ids: &[JobId],
1115 ) -> Result<HashMap<JobId, TableFragmentInfo>> {
1116 let request = ListTableFragmentsRequest {
1117 table_ids: job_ids.to_vec(),
1118 };
1119 let resp = self.inner.list_table_fragments(request).await?;
1120 Ok(resp.table_fragments)
1121 }
1122
1123 pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1124 let resp = self
1125 .inner
1126 .list_streaming_job_states(ListStreamingJobStatesRequest {})
1127 .await?;
1128 Ok(resp.states)
1129 }
1130
1131 pub async fn list_fragment_distributions(&self) -> Result<Vec<FragmentDistribution>> {
1132 let resp = self
1133 .inner
1134 .list_fragment_distribution(ListFragmentDistributionRequest {})
1135 .await?;
1136 Ok(resp.distributions)
1137 }
1138
1139 pub async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
1140 let resp = self
1141 .inner
1142 .list_creating_fragment_distribution(ListCreatingFragmentDistributionRequest {})
1143 .await?;
1144 Ok(resp.distributions)
1145 }
1146
1147 pub async fn get_fragment_by_id(
1148 &self,
1149 fragment_id: FragmentId,
1150 ) -> Result<Option<FragmentDistribution>> {
1151 let resp = self
1152 .inner
1153 .get_fragment_by_id(GetFragmentByIdRequest { fragment_id })
1154 .await?;
1155 Ok(resp.distribution)
1156 }
1157
1158 pub async fn get_fragment_vnodes(
1159 &self,
1160 fragment_id: FragmentId,
1161 ) -> Result<Vec<(ActorId, Vec<u32>)>> {
1162 let resp = self
1163 .inner
1164 .get_fragment_vnodes(GetFragmentVnodesRequest { fragment_id })
1165 .await?;
1166 Ok(resp
1167 .actor_vnodes
1168 .into_iter()
1169 .map(|actor| (actor.actor_id, actor.vnode_indices))
1170 .collect())
1171 }
1172
1173 pub async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>> {
1174 let resp = self
1175 .inner
1176 .get_actor_vnodes(GetActorVnodesRequest { actor_id })
1177 .await?;
1178 Ok(resp.vnode_indices)
1179 }
1180
1181 pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1182 let resp = self
1183 .inner
1184 .list_actor_states(ListActorStatesRequest {})
1185 .await?;
1186 Ok(resp.states)
1187 }
1188
1189 pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1190 let resp = self
1191 .inner
1192 .list_actor_splits(ListActorSplitsRequest {})
1193 .await?;
1194
1195 Ok(resp.actor_splits)
1196 }
1197
1198 pub async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
1199 let resp = self
1200 .inner
1201 .list_object_dependencies(ListObjectDependenciesRequest {})
1202 .await?;
1203 Ok(resp.dependencies)
1204 }
1205
1206 pub async fn pause(&self) -> Result<PauseResponse> {
1207 let request = PauseRequest {};
1208 let resp = self.inner.pause(request).await?;
1209 Ok(resp)
1210 }
1211
1212 pub async fn resume(&self) -> Result<ResumeResponse> {
1213 let request = ResumeRequest {};
1214 let resp = self.inner.resume(request).await?;
1215 Ok(resp)
1216 }
1217
1218 pub async fn apply_throttle(
1219 &self,
1220 kind: PbThrottleTarget,
1221 id: u32,
1222 rate: Option<u32>,
1223 ) -> Result<ApplyThrottleResponse> {
1224 let request = ApplyThrottleRequest {
1225 kind: kind as i32,
1226 id,
1227 rate,
1228 };
1229 let resp = self.inner.apply_throttle(request).await?;
1230 Ok(resp)
1231 }
1232
1233 pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1234 let resp = self
1235 .inner
1236 .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1237 .await?;
1238 Ok(resp.get_status().unwrap())
1239 }
1240
1241 pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1242 let request = GetClusterInfoRequest {};
1243 let resp = self.inner.get_cluster_info(request).await?;
1244 Ok(resp)
1245 }
1246
1247 pub async fn reschedule(
1248 &self,
1249 worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1250 revision: u64,
1251 resolve_no_shuffle_upstream: bool,
1252 ) -> Result<(bool, u64)> {
1253 let request = RescheduleRequest {
1254 revision,
1255 resolve_no_shuffle_upstream,
1256 worker_reschedules,
1257 };
1258 let resp = self.inner.reschedule(request).await?;
1259 Ok((resp.success, resp.revision))
1260 }
1261
1262 pub async fn risectl_get_pinned_versions_summary(
1263 &self,
1264 ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1265 let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1266 self.inner
1267 .rise_ctl_get_pinned_versions_summary(request)
1268 .await
1269 }
1270
1271 pub async fn risectl_get_checkpoint_hummock_version(
1272 &self,
1273 ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1274 let request = RiseCtlGetCheckpointVersionRequest {};
1275 self.inner.rise_ctl_get_checkpoint_version(request).await
1276 }
1277
1278 pub async fn risectl_pause_hummock_version_checkpoint(
1279 &self,
1280 ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1281 let request = RiseCtlPauseVersionCheckpointRequest {};
1282 self.inner.rise_ctl_pause_version_checkpoint(request).await
1283 }
1284
1285 pub async fn risectl_resume_hummock_version_checkpoint(
1286 &self,
1287 ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1288 let request = RiseCtlResumeVersionCheckpointRequest {};
1289 self.inner.rise_ctl_resume_version_checkpoint(request).await
1290 }
1291
1292 pub async fn init_metadata_for_replay(
1293 &self,
1294 tables: Vec<PbTable>,
1295 compaction_groups: Vec<CompactionGroupInfo>,
1296 ) -> Result<()> {
1297 let req = InitMetadataForReplayRequest {
1298 tables,
1299 compaction_groups,
1300 };
1301 let _resp = self.inner.init_metadata_for_replay(req).await?;
1302 Ok(())
1303 }
1304
1305 pub async fn replay_version_delta(
1306 &self,
1307 version_delta: HummockVersionDelta,
1308 ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1309 let req = ReplayVersionDeltaRequest {
1310 version_delta: Some(version_delta.into()),
1311 };
1312 let resp = self.inner.replay_version_delta(req).await?;
1313 Ok((
1314 HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1315 resp.modified_compaction_groups,
1316 ))
1317 }
1318
1319 pub async fn list_version_deltas(
1320 &self,
1321 start_id: HummockVersionId,
1322 num_limit: u32,
1323 committed_epoch_limit: HummockEpoch,
1324 ) -> Result<Vec<HummockVersionDelta>> {
1325 let req = ListVersionDeltasRequest {
1326 start_id: start_id.to_u64(),
1327 num_limit,
1328 committed_epoch_limit,
1329 };
1330 Ok(self
1331 .inner
1332 .list_version_deltas(req)
1333 .await?
1334 .version_deltas
1335 .unwrap()
1336 .version_deltas
1337 .iter()
1338 .map(HummockVersionDelta::from_rpc_protobuf)
1339 .collect())
1340 }
1341
1342 pub async fn trigger_compaction_deterministic(
1343 &self,
1344 version_id: HummockVersionId,
1345 compaction_groups: Vec<CompactionGroupId>,
1346 ) -> Result<()> {
1347 let req = TriggerCompactionDeterministicRequest {
1348 version_id: version_id.to_u64(),
1349 compaction_groups,
1350 };
1351 self.inner.trigger_compaction_deterministic(req).await?;
1352 Ok(())
1353 }
1354
1355 pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1356 let req = DisableCommitEpochRequest {};
1357 Ok(HummockVersion::from_rpc_protobuf(
1358 &self
1359 .inner
1360 .disable_commit_epoch(req)
1361 .await?
1362 .current_version
1363 .unwrap(),
1364 ))
1365 }
1366
1367 pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1368 let req = GetAssignedCompactTaskNumRequest {};
1369 let resp = self.inner.get_assigned_compact_task_num(req).await?;
1370 Ok(resp.num_tasks as usize)
1371 }
1372
1373 pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1374 let req = RiseCtlListCompactionGroupRequest {};
1375 let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1376 Ok(resp.compaction_groups)
1377 }
1378
1379 pub async fn risectl_update_compaction_config(
1380 &self,
1381 compaction_groups: &[CompactionGroupId],
1382 configs: &[MutableConfig],
1383 ) -> Result<()> {
1384 let req = RiseCtlUpdateCompactionConfigRequest {
1385 compaction_group_ids: compaction_groups.to_vec(),
1386 configs: configs
1387 .iter()
1388 .map(
1389 |c| rise_ctl_update_compaction_config_request::MutableConfig {
1390 mutable_config: Some(c.clone()),
1391 },
1392 )
1393 .collect(),
1394 };
1395 let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1396 Ok(())
1397 }
1398
1399 pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1400 let req = BackupMetaRequest { remarks };
1401 let resp = self.inner.backup_meta(req).await?;
1402 Ok(resp.job_id)
1403 }
1404
1405 pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1406 let req = GetBackupJobStatusRequest { job_id };
1407 let resp = self.inner.get_backup_job_status(req).await?;
1408 Ok((resp.job_status(), resp.message))
1409 }
1410
1411 pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1412 let req = DeleteMetaSnapshotRequest {
1413 snapshot_ids: snapshot_ids.to_vec(),
1414 };
1415 let _resp = self.inner.delete_meta_snapshot(req).await?;
1416 Ok(())
1417 }
1418
1419 pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1420 let req = GetMetaSnapshotManifestRequest {};
1421 let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1422 Ok(resp.manifest.expect("should exist"))
1423 }
1424
1425 pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1426 let req = GetTelemetryInfoRequest {};
1427 let resp = self.inner.get_telemetry_info(req).await?;
1428 Ok(resp)
1429 }
1430
1431 pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1432 let req = GetMetaStoreInfoRequest {};
1433 let resp = self.inner.get_meta_store_info(req).await?;
1434 Ok(resp.meta_store_endpoint)
1435 }
1436
1437 pub async fn alter_sink_props(
1438 &self,
1439 sink_id: SinkId,
1440 changed_props: BTreeMap<String, String>,
1441 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1442 connector_conn_ref: Option<ConnectionId>,
1443 ) -> Result<()> {
1444 let req = AlterConnectorPropsRequest {
1445 object_id: sink_id.as_raw_id(),
1446 changed_props: changed_props.into_iter().collect(),
1447 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1448 connector_conn_ref,
1449 object_type: AlterConnectorPropsObject::Sink as i32,
1450 extra_options: None,
1451 };
1452 let _resp = self.inner.alter_connector_props(req).await?;
1453 Ok(())
1454 }
1455
1456 pub async fn alter_iceberg_table_props(
1457 &self,
1458 table_id: TableId,
1459 sink_id: SinkId,
1460 source_id: SourceId,
1461 changed_props: BTreeMap<String, String>,
1462 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1463 connector_conn_ref: Option<ConnectionId>,
1464 ) -> Result<()> {
1465 let req = AlterConnectorPropsRequest {
1466 object_id: table_id.as_raw_id(),
1467 changed_props: changed_props.into_iter().collect(),
1468 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1469 connector_conn_ref,
1470 object_type: AlterConnectorPropsObject::IcebergTable as i32,
1471 extra_options: Some(ExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds {
1472 sink_id,
1473 source_id,
1474 })),
1475 };
1476 let _resp = self.inner.alter_connector_props(req).await?;
1477 Ok(())
1478 }
1479
1480 pub async fn alter_source_connector_props(
1481 &self,
1482 source_id: SourceId,
1483 changed_props: BTreeMap<String, String>,
1484 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1485 connector_conn_ref: Option<ConnectionId>,
1486 ) -> Result<()> {
1487 let req = AlterConnectorPropsRequest {
1488 object_id: source_id.as_raw_id(),
1489 changed_props: changed_props.into_iter().collect(),
1490 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1491 connector_conn_ref,
1492 object_type: AlterConnectorPropsObject::Source as i32,
1493 extra_options: None,
1494 };
1495 let _resp = self.inner.alter_connector_props(req).await?;
1496 Ok(())
1497 }
1498
1499 pub async fn set_system_param(
1500 &self,
1501 param: String,
1502 value: Option<String>,
1503 ) -> Result<Option<SystemParamsReader>> {
1504 let req = SetSystemParamRequest { param, value };
1505 let resp = self.inner.set_system_param(req).await?;
1506 Ok(resp.params.map(SystemParamsReader::from))
1507 }
1508
1509 pub async fn get_session_params(&self) -> Result<String> {
1510 let req = GetSessionParamsRequest {};
1511 let resp = self.inner.get_session_params(req).await?;
1512 Ok(resp.params)
1513 }
1514
1515 pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1516 let req = SetSessionParamRequest { param, value };
1517 let resp = self.inner.set_session_param(req).await?;
1518 Ok(resp.param)
1519 }
1520
1521 pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1522 let req = GetDdlProgressRequest {};
1523 let resp = self.inner.get_ddl_progress(req).await?;
1524 Ok(resp.ddl_progress)
1525 }
1526
1527 pub async fn split_compaction_group(
1528 &self,
1529 group_id: CompactionGroupId,
1530 table_ids_to_new_group: &[TableId],
1531 partition_vnode_count: u32,
1532 ) -> Result<CompactionGroupId> {
1533 let req = SplitCompactionGroupRequest {
1534 group_id,
1535 table_ids: table_ids_to_new_group.to_vec(),
1536 partition_vnode_count,
1537 };
1538 let resp = self.inner.split_compaction_group(req).await?;
1539 Ok(resp.new_group_id)
1540 }
1541
1542 pub async fn get_tables(
1543 &self,
1544 table_ids: Vec<TableId>,
1545 include_dropped_tables: bool,
1546 ) -> Result<HashMap<TableId, Table>> {
1547 let req = GetTablesRequest {
1548 table_ids,
1549 include_dropped_tables,
1550 };
1551 let resp = self.inner.get_tables(req).await?;
1552 Ok(resp.tables)
1553 }
1554
1555 pub async fn list_serving_vnode_mappings(
1556 &self,
1557 ) -> Result<HashMap<FragmentId, (u32, WorkerSlotMapping)>> {
1558 let req = GetServingVnodeMappingsRequest {};
1559 let resp = self.inner.get_serving_vnode_mappings(req).await?;
1560 let mappings = resp
1561 .worker_slot_mappings
1562 .into_iter()
1563 .map(|p| {
1564 (
1565 p.fragment_id,
1566 (
1567 resp.fragment_to_table
1568 .get(&p.fragment_id)
1569 .cloned()
1570 .unwrap_or(0),
1571 WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1572 ),
1573 )
1574 })
1575 .collect();
1576 Ok(mappings)
1577 }
1578
1579 pub async fn risectl_list_compaction_status(
1580 &self,
1581 ) -> Result<(
1582 Vec<CompactStatus>,
1583 Vec<CompactTaskAssignment>,
1584 Vec<CompactTaskProgress>,
1585 )> {
1586 let req = RiseCtlListCompactionStatusRequest {};
1587 let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1588 Ok((
1589 resp.compaction_statuses,
1590 resp.task_assignment,
1591 resp.task_progress,
1592 ))
1593 }
1594
1595 pub async fn get_compaction_score(
1596 &self,
1597 compaction_group_id: CompactionGroupId,
1598 ) -> Result<Vec<PickerInfo>> {
1599 let req = GetCompactionScoreRequest {
1600 compaction_group_id,
1601 };
1602 let resp = self.inner.get_compaction_score(req).await?;
1603 Ok(resp.scores)
1604 }
1605
1606 pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1607 let req = RiseCtlRebuildTableStatsRequest {};
1608 let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1609 Ok(())
1610 }
1611
1612 pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1613 let req = ListBranchedObjectRequest {};
1614 let resp = self.inner.list_branched_object(req).await?;
1615 Ok(resp.branched_objects)
1616 }
1617
1618 pub async fn list_active_write_limit(&self) -> Result<HashMap<u64, WriteLimit>> {
1619 let req = ListActiveWriteLimitRequest {};
1620 let resp = self.inner.list_active_write_limit(req).await?;
1621 Ok(resp.write_limits)
1622 }
1623
1624 pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1625 let req = ListHummockMetaConfigRequest {};
1626 let resp = self.inner.list_hummock_meta_config(req).await?;
1627 Ok(resp.configs)
1628 }
1629
1630 pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1631 let _resp = self
1632 .inner
1633 .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1634 .await?;
1635
1636 Ok(())
1637 }
1638
1639 pub async fn rw_cloud_validate_source(
1640 &self,
1641 source_type: SourceType,
1642 source_config: HashMap<String, String>,
1643 ) -> Result<RwCloudValidateSourceResponse> {
1644 let req = RwCloudValidateSourceRequest {
1645 source_type: source_type.into(),
1646 source_config,
1647 };
1648 let resp = self.inner.rw_cloud_validate_source(req).await?;
1649 Ok(resp)
1650 }
1651
1652 pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1653 self.inner.core.read().await.sink_coordinate_client.clone()
1654 }
1655
1656 pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1657 let req = ListCompactTaskAssignmentRequest {};
1658 let resp = self.inner.list_compact_task_assignment(req).await?;
1659 Ok(resp.task_assignment)
1660 }
1661
1662 pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1663 let req = ListEventLogRequest::default();
1664 let resp = self.inner.list_event_log(req).await?;
1665 Ok(resp.event_logs)
1666 }
1667
1668 pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1669 let req = ListCompactTaskProgressRequest {};
1670 let resp = self.inner.list_compact_task_progress(req).await?;
1671 Ok(resp.task_progress)
1672 }
1673
1674 #[cfg(madsim)]
1675 pub fn try_add_panic_event_blocking(
1676 &self,
1677 panic_info: impl Display,
1678 timeout_millis: Option<u64>,
1679 ) {
1680 }
1681
1682 #[cfg(not(madsim))]
1684 pub fn try_add_panic_event_blocking(
1685 &self,
1686 panic_info: impl Display,
1687 timeout_millis: Option<u64>,
1688 ) {
1689 let event = event_log::EventWorkerNodePanic {
1690 worker_id: self.worker_id,
1691 worker_type: self.worker_type.into(),
1692 host_addr: Some(self.host_addr.to_protobuf()),
1693 panic_info: format!("{panic_info}"),
1694 };
1695 let grpc_meta_client = self.inner.clone();
1696 let _ = thread::spawn(move || {
1697 let rt = tokio::runtime::Builder::new_current_thread()
1698 .enable_all()
1699 .build()
1700 .unwrap();
1701 let req = AddEventLogRequest {
1702 event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1703 };
1704 rt.block_on(async {
1705 let _ = tokio::time::timeout(
1706 Duration::from_millis(timeout_millis.unwrap_or(1000)),
1707 grpc_meta_client.add_event_log(req),
1708 )
1709 .await;
1710 });
1711 })
1712 .join();
1713 }
1714
1715 pub async fn add_sink_fail_evet(
1716 &self,
1717 sink_id: SinkId,
1718 sink_name: String,
1719 connector: String,
1720 error: String,
1721 ) -> Result<()> {
1722 let event = event_log::EventSinkFail {
1723 sink_id,
1724 sink_name,
1725 connector,
1726 error,
1727 };
1728 let req = AddEventLogRequest {
1729 event: Some(add_event_log_request::Event::SinkFail(event)),
1730 };
1731 self.inner.add_event_log(req).await?;
1732 Ok(())
1733 }
1734
1735 pub async fn add_cdc_auto_schema_change_fail_event(
1736 &self,
1737 source_id: SourceId,
1738 table_name: String,
1739 cdc_table_id: String,
1740 upstream_ddl: String,
1741 fail_info: String,
1742 ) -> Result<()> {
1743 let event = event_log::EventAutoSchemaChangeFail {
1744 table_id: source_id.as_cdc_table_id(),
1745 table_name,
1746 cdc_table_id,
1747 upstream_ddl,
1748 fail_info,
1749 };
1750 let req = AddEventLogRequest {
1751 event: Some(add_event_log_request::Event::AutoSchemaChangeFail(event)),
1752 };
1753 self.inner.add_event_log(req).await?;
1754 Ok(())
1755 }
1756
1757 pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1758 let req = CancelCompactTaskRequest {
1759 task_id,
1760 task_status: task_status as _,
1761 };
1762 let resp = self.inner.cancel_compact_task(req).await?;
1763 Ok(resp.ret)
1764 }
1765
1766 pub async fn get_version_by_epoch(
1767 &self,
1768 epoch: HummockEpoch,
1769 table_id: TableId,
1770 ) -> Result<PbHummockVersion> {
1771 let req = GetVersionByEpochRequest { epoch, table_id };
1772 let resp = self.inner.get_version_by_epoch(req).await?;
1773 Ok(resp.version.unwrap())
1774 }
1775
1776 pub async fn get_cluster_limits(
1777 &self,
1778 ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1779 let req = GetClusterLimitsRequest {};
1780 let resp = self.inner.get_cluster_limits(req).await?;
1781 Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1782 }
1783
1784 pub async fn merge_compaction_group(
1785 &self,
1786 left_group_id: CompactionGroupId,
1787 right_group_id: CompactionGroupId,
1788 ) -> Result<()> {
1789 let req = MergeCompactionGroupRequest {
1790 left_group_id,
1791 right_group_id,
1792 };
1793 self.inner.merge_compaction_group(req).await?;
1794 Ok(())
1795 }
1796
1797 pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1799 let request = ListRateLimitsRequest {};
1800 let resp = self.inner.list_rate_limits(request).await?;
1801 Ok(resp.rate_limits)
1802 }
1803
1804 pub async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
1805 let request = ListCdcProgressRequest {};
1806 let resp = self.inner.list_cdc_progress(request).await?;
1807 Ok(resp.cdc_progress)
1808 }
1809
1810 pub async fn create_iceberg_table(
1811 &self,
1812 table_job_info: PbTableJobInfo,
1813 sink_job_info: PbSinkJobInfo,
1814 iceberg_source: PbSource,
1815 if_not_exists: bool,
1816 ) -> Result<WaitVersion> {
1817 let request = CreateIcebergTableRequest {
1818 table_info: Some(table_job_info),
1819 sink_info: Some(sink_job_info),
1820 iceberg_source: Some(iceberg_source),
1821 if_not_exists,
1822 };
1823
1824 let resp = self.inner.create_iceberg_table(request).await?;
1825 Ok(resp
1826 .version
1827 .ok_or_else(|| anyhow!("wait version not set"))?)
1828 }
1829
1830 pub async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
1831 let request = ListIcebergTablesRequest {};
1832 let resp = self.inner.list_iceberg_tables(request).await?;
1833 Ok(resp.iceberg_tables)
1834 }
1835
1836 pub async fn get_cluster_stack_trace(
1838 &self,
1839 actor_traces_format: ActorTracesFormat,
1840 ) -> Result<StackTraceResponse> {
1841 let request = StackTraceRequest {
1842 actor_traces_format: actor_traces_format as i32,
1843 };
1844 let resp = self.inner.stack_trace(request).await?;
1845 Ok(resp)
1846 }
1847
1848 pub async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
1849 let request = SetSyncLogStoreAlignedRequest { job_id, aligned };
1850 self.inner.set_sync_log_store_aligned(request).await?;
1851 Ok(())
1852 }
1853
1854 pub async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
1855 self.inner.refresh(request).await
1856 }
1857
1858 pub async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
1859 let request = ListUnmigratedTablesRequest {};
1860 let resp = self.inner.list_unmigrated_tables(request).await?;
1861 Ok(resp
1862 .tables
1863 .into_iter()
1864 .map(|table| (table.table_id, table.table_name))
1865 .collect())
1866 }
1867}
1868
1869#[async_trait]
1870impl HummockMetaClient for MetaClient {
1871 async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
1872 let req = UnpinVersionBeforeRequest {
1873 context_id: self.worker_id(),
1874 unpin_version_before: unpin_version_before.to_u64(),
1875 };
1876 self.inner.unpin_version_before(req).await?;
1877 Ok(())
1878 }
1879
1880 async fn get_current_version(&self) -> Result<HummockVersion> {
1881 let req = GetCurrentVersionRequest::default();
1882 Ok(HummockVersion::from_rpc_protobuf(
1883 &self
1884 .inner
1885 .get_current_version(req)
1886 .await?
1887 .current_version
1888 .unwrap(),
1889 ))
1890 }
1891
1892 async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
1893 let resp = self
1894 .inner
1895 .get_new_object_ids(GetNewObjectIdsRequest { number })
1896 .await?;
1897 Ok(ObjectIdRange::new(resp.start_id, resp.end_id))
1898 }
1899
1900 async fn commit_epoch_with_change_log(
1901 &self,
1902 _epoch: HummockEpoch,
1903 _sync_result: SyncResult,
1904 _change_log_info: Option<HummockMetaClientChangeLogInfo>,
1905 ) -> Result<()> {
1906 panic!("Only meta service can commit_epoch in production.")
1907 }
1908
1909 async fn trigger_manual_compaction(
1910 &self,
1911 compaction_group_id: u64,
1912 table_id: JobId,
1913 level: u32,
1914 sst_ids: Vec<u64>,
1915 ) -> Result<()> {
1916 let req = TriggerManualCompactionRequest {
1918 compaction_group_id,
1919 table_id,
1920 level,
1923 sst_ids,
1924 ..Default::default()
1925 };
1926
1927 self.inner.trigger_manual_compaction(req).await?;
1928 Ok(())
1929 }
1930
1931 async fn trigger_full_gc(
1932 &self,
1933 sst_retention_time_sec: u64,
1934 prefix: Option<String>,
1935 ) -> Result<()> {
1936 self.inner
1937 .trigger_full_gc(TriggerFullGcRequest {
1938 sst_retention_time_sec,
1939 prefix,
1940 })
1941 .await?;
1942 Ok(())
1943 }
1944
1945 async fn subscribe_compaction_event(
1946 &self,
1947 ) -> Result<(
1948 UnboundedSender<SubscribeCompactionEventRequest>,
1949 BoxStream<'static, CompactionEventItem>,
1950 )> {
1951 let (request_sender, request_receiver) =
1952 unbounded_channel::<SubscribeCompactionEventRequest>();
1953 request_sender
1954 .send(SubscribeCompactionEventRequest {
1955 event: Some(subscribe_compaction_event_request::Event::Register(
1956 Register {
1957 context_id: self.worker_id(),
1958 },
1959 )),
1960 create_at: SystemTime::now()
1961 .duration_since(SystemTime::UNIX_EPOCH)
1962 .expect("Clock may have gone backwards")
1963 .as_millis() as u64,
1964 })
1965 .context("Failed to subscribe compaction event")?;
1966
1967 let stream = self
1968 .inner
1969 .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
1970 request_receiver,
1971 )))
1972 .await?;
1973
1974 Ok((request_sender, Box::pin(stream)))
1975 }
1976
1977 async fn get_version_by_epoch(
1978 &self,
1979 epoch: HummockEpoch,
1980 table_id: TableId,
1981 ) -> Result<PbHummockVersion> {
1982 self.get_version_by_epoch(epoch, table_id).await
1983 }
1984
1985 async fn subscribe_iceberg_compaction_event(
1986 &self,
1987 ) -> Result<(
1988 UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1989 BoxStream<'static, IcebergCompactionEventItem>,
1990 )> {
1991 let (request_sender, request_receiver) =
1992 unbounded_channel::<SubscribeIcebergCompactionEventRequest>();
1993 request_sender
1994 .send(SubscribeIcebergCompactionEventRequest {
1995 event: Some(subscribe_iceberg_compaction_event_request::Event::Register(
1996 IcebergRegister {
1997 context_id: self.worker_id(),
1998 },
1999 )),
2000 create_at: SystemTime::now()
2001 .duration_since(SystemTime::UNIX_EPOCH)
2002 .expect("Clock may have gone backwards")
2003 .as_millis() as u64,
2004 })
2005 .context("Failed to subscribe compaction event")?;
2006
2007 let stream = self
2008 .inner
2009 .subscribe_iceberg_compaction_event(Request::new(UnboundedReceiverStream::new(
2010 request_receiver,
2011 )))
2012 .await?;
2013
2014 Ok((request_sender, Box::pin(stream)))
2015 }
2016}
2017
2018#[async_trait]
2019impl TelemetryInfoFetcher for MetaClient {
2020 async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
2021 let resp = self
2022 .get_telemetry_info()
2023 .await
2024 .map_err(|e| e.to_report_string())?;
2025 let tracking_id = resp.get_tracking_id().ok();
2026 Ok(tracking_id.map(|id| id.to_owned()))
2027 }
2028}
2029
2030pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
2031
2032#[derive(Debug, Clone)]
2033struct GrpcMetaClientCore {
2034 cluster_client: ClusterServiceClient<Channel>,
2035 meta_member_client: MetaMemberServiceClient<Channel>,
2036 heartbeat_client: HeartbeatServiceClient<Channel>,
2037 ddl_client: DdlServiceClient<Channel>,
2038 hummock_client: HummockManagerServiceClient<Channel>,
2039 notification_client: NotificationServiceClient<Channel>,
2040 stream_client: StreamManagerServiceClient<Channel>,
2041 user_client: UserServiceClient<Channel>,
2042 scale_client: ScaleServiceClient<Channel>,
2043 backup_client: BackupServiceClient<Channel>,
2044 telemetry_client: TelemetryInfoServiceClient<Channel>,
2045 system_params_client: SystemParamsServiceClient<Channel>,
2046 session_params_client: SessionParamServiceClient<Channel>,
2047 serving_client: ServingServiceClient<Channel>,
2048 cloud_client: CloudServiceClient<Channel>,
2049 sink_coordinate_client: SinkCoordinationRpcClient,
2050 event_log_client: EventLogServiceClient<Channel>,
2051 cluster_limit_client: ClusterLimitServiceClient<Channel>,
2052 hosted_iceberg_catalog_service_client: HostedIcebergCatalogServiceClient<Channel>,
2053 monitor_client: MonitorServiceClient<Channel>,
2054}
2055
2056impl GrpcMetaClientCore {
2057 pub(crate) fn new(channel: Channel) -> Self {
2058 let cluster_client = ClusterServiceClient::new(channel.clone());
2059 let meta_member_client = MetaMemberClient::new(channel.clone());
2060 let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
2061 let ddl_client =
2062 DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2063 let hummock_client =
2064 HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2065 let notification_client =
2066 NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2067 let stream_client =
2068 StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2069 let user_client = UserServiceClient::new(channel.clone());
2070 let scale_client =
2071 ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2072 let backup_client = BackupServiceClient::new(channel.clone());
2073 let telemetry_client =
2074 TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2075 let system_params_client = SystemParamsServiceClient::new(channel.clone());
2076 let session_params_client = SessionParamServiceClient::new(channel.clone());
2077 let serving_client = ServingServiceClient::new(channel.clone());
2078 let cloud_client = CloudServiceClient::new(channel.clone());
2079 let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone())
2080 .max_decoding_message_size(usize::MAX);
2081 let event_log_client = EventLogServiceClient::new(channel.clone());
2082 let cluster_limit_client = ClusterLimitServiceClient::new(channel.clone());
2083 let hosted_iceberg_catalog_service_client =
2084 HostedIcebergCatalogServiceClient::new(channel.clone());
2085 let monitor_client = MonitorServiceClient::new(channel);
2086
2087 GrpcMetaClientCore {
2088 cluster_client,
2089 meta_member_client,
2090 heartbeat_client,
2091 ddl_client,
2092 hummock_client,
2093 notification_client,
2094 stream_client,
2095 user_client,
2096 scale_client,
2097 backup_client,
2098 telemetry_client,
2099 system_params_client,
2100 session_params_client,
2101 serving_client,
2102 cloud_client,
2103 sink_coordinate_client,
2104 event_log_client,
2105 cluster_limit_client,
2106 hosted_iceberg_catalog_service_client,
2107 monitor_client,
2108 }
2109 }
2110}
2111
2112#[derive(Debug, Clone)]
2116struct GrpcMetaClient {
2117 member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
2118 core: Arc<RwLock<GrpcMetaClientCore>>,
2119}
2120
2121type MetaMemberClient = MetaMemberServiceClient<Channel>;
2122
2123struct MetaMemberGroup {
2124 members: LruCache<http::Uri, Option<MetaMemberClient>>,
2125}
2126
2127struct MetaMemberManagement {
2128 core_ref: Arc<RwLock<GrpcMetaClientCore>>,
2129 members: Either<MetaMemberClient, MetaMemberGroup>,
2130 current_leader: http::Uri,
2131 meta_config: Arc<MetaConfig>,
2132}
2133
2134impl MetaMemberManagement {
2135 const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
2136
2137 fn host_address_to_uri(addr: HostAddress) -> http::Uri {
2138 format!("http://{}:{}", addr.host, addr.port)
2139 .parse()
2140 .unwrap()
2141 }
2142
2143 async fn recreate_core(&self, channel: Channel) {
2144 let mut core = self.core_ref.write().await;
2145 *core = GrpcMetaClientCore::new(channel);
2146 }
2147
2148 async fn refresh_members(&mut self) -> Result<()> {
2149 let leader_addr = match self.members.as_mut() {
2150 Either::Left(client) => {
2151 let resp = client
2152 .to_owned()
2153 .members(MembersRequest {})
2154 .await
2155 .map_err(RpcError::from_meta_status)?;
2156 let resp = resp.into_inner();
2157 resp.members.into_iter().find(|member| member.is_leader)
2158 }
2159 Either::Right(member_group) => {
2160 let mut fetched_members = None;
2161
2162 for (addr, client) in &mut member_group.members {
2163 let members: Result<_> = try {
2164 let mut client = match client {
2165 Some(cached_client) => cached_client.to_owned(),
2166 None => {
2167 let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
2168 let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
2169 .await
2170 .context("failed to create client")?;
2171 let new_client: MetaMemberClient =
2172 MetaMemberServiceClient::new(channel);
2173 *client = Some(new_client.clone());
2174
2175 new_client
2176 }
2177 };
2178
2179 let resp = client
2180 .members(MembersRequest {})
2181 .await
2182 .context("failed to fetch members")?;
2183
2184 resp.into_inner().members
2185 };
2186
2187 let fetched = members.is_ok();
2188 fetched_members = Some(members);
2189 if fetched {
2190 break;
2191 }
2192 }
2193
2194 let members = fetched_members
2195 .context("no member available in the list")?
2196 .context("could not refresh members")?;
2197
2198 let mut leader = None;
2200 for member in members {
2201 if member.is_leader {
2202 leader = Some(member.clone());
2203 }
2204
2205 let addr = Self::host_address_to_uri(member.address.unwrap());
2206 if !member_group.members.contains(&addr) {
2208 tracing::info!("new meta member joined: {}", addr);
2209 member_group.members.put(addr, None);
2210 }
2211 }
2212
2213 leader
2214 }
2215 };
2216
2217 if let Some(leader) = leader_addr {
2218 let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
2219
2220 if discovered_leader != self.current_leader {
2221 tracing::info!("new meta leader {} discovered", discovered_leader);
2222
2223 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
2224 Duration::from_secs(self.meta_config.meta_leader_lease_secs),
2225 false,
2226 );
2227
2228 let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
2229 let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
2230 GrpcMetaClient::connect_to_endpoint(endpoint).await
2231 })
2232 .await?;
2233
2234 self.recreate_core(channel).await;
2235 self.current_leader = discovered_leader;
2236 }
2237 }
2238
2239 Ok(())
2240 }
2241}
2242
2243impl GrpcMetaClient {
2244 const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
2246 const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
2248 const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
2250 const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
2252
2253 fn start_meta_member_monitor(
2254 &self,
2255 init_leader_addr: http::Uri,
2256 members: Either<MetaMemberClient, MetaMemberGroup>,
2257 force_refresh_receiver: Receiver<Sender<Result<()>>>,
2258 meta_config: Arc<MetaConfig>,
2259 ) -> Result<()> {
2260 let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
2261 let current_leader = init_leader_addr;
2262
2263 let enable_period_tick = matches!(members, Either::Right(_));
2264
2265 let member_management = MetaMemberManagement {
2266 core_ref,
2267 members,
2268 current_leader,
2269 meta_config,
2270 };
2271
2272 let mut force_refresh_receiver = force_refresh_receiver;
2273
2274 tokio::spawn(async move {
2275 let mut member_management = member_management;
2276 let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
2277
2278 loop {
2279 let event: Option<Sender<Result<()>>> = if enable_period_tick {
2280 tokio::select! {
2281 _ = ticker.tick() => None,
2282 result_sender = force_refresh_receiver.recv() => {
2283 if result_sender.is_none() {
2284 break;
2285 }
2286
2287 result_sender
2288 },
2289 }
2290 } else {
2291 let result_sender = force_refresh_receiver.recv().await;
2292
2293 if result_sender.is_none() {
2294 break;
2295 }
2296
2297 result_sender
2298 };
2299
2300 let tick_result = member_management.refresh_members().await;
2301 if let Err(e) = tick_result.as_ref() {
2302 tracing::warn!(error = %e.as_report(), "refresh meta member client failed");
2303 }
2304
2305 if let Some(sender) = event {
2306 let _resp = sender.send(tick_result);
2308 }
2309 }
2310 });
2311
2312 Ok(())
2313 }
2314
2315 async fn force_refresh_leader(&self) -> Result<()> {
2316 let (sender, receiver) = oneshot::channel();
2317
2318 self.member_monitor_event_sender
2319 .send(sender)
2320 .await
2321 .map_err(|e| anyhow!(e))?;
2322
2323 receiver.await.map_err(|e| anyhow!(e))?
2324 }
2325
2326 pub async fn new(strategy: &MetaAddressStrategy, config: Arc<MetaConfig>) -> Result<Self> {
2328 let (channel, addr) = match strategy {
2329 MetaAddressStrategy::LoadBalance(addr) => {
2330 Self::try_build_rpc_channel(vec![addr.clone()]).await
2331 }
2332 MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
2333 }?;
2334 let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
2335 let client = GrpcMetaClient {
2336 member_monitor_event_sender: force_refresh_sender,
2337 core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
2338 };
2339
2340 let meta_member_client = client.core.read().await.meta_member_client.clone();
2341 let members = match strategy {
2342 MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
2343 MetaAddressStrategy::List(addrs) => {
2344 let mut members = LruCache::new(20.try_into().unwrap());
2345 for addr in addrs {
2346 members.put(addr.clone(), None);
2347 }
2348 members.put(addr.clone(), Some(meta_member_client));
2349
2350 Either::Right(MetaMemberGroup { members })
2351 }
2352 };
2353
2354 client.start_meta_member_monitor(addr, members, force_refresh_receiver, config.clone())?;
2355
2356 client.force_refresh_leader().await?;
2357
2358 Ok(client)
2359 }
2360
2361 fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2362 Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2363 }
2364
2365 pub(crate) async fn try_build_rpc_channel(
2366 addrs: impl IntoIterator<Item = http::Uri>,
2367 ) -> Result<(Channel, http::Uri)> {
2368 let endpoints: Vec<_> = addrs
2369 .into_iter()
2370 .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2371 .collect();
2372
2373 let mut last_error = None;
2374
2375 for (endpoint, addr) in endpoints {
2376 match Self::connect_to_endpoint(endpoint).await {
2377 Ok(channel) => {
2378 tracing::info!("Connect to meta server {} successfully", addr);
2379 return Ok((channel, addr));
2380 }
2381 Err(e) => {
2382 tracing::warn!(
2383 error = %e.as_report(),
2384 "Failed to connect to meta server {}, trying again",
2385 addr,
2386 );
2387 last_error = Some(e);
2388 }
2389 }
2390 }
2391
2392 if let Some(last_error) = last_error {
2393 Err(anyhow::anyhow!(last_error)
2394 .context("failed to connect to all meta servers")
2395 .into())
2396 } else {
2397 bail!("no meta server address provided")
2398 }
2399 }
2400
2401 async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2402 let channel = endpoint
2403 .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2404 .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2405 .connect_timeout(Duration::from_secs(5))
2406 .monitored_connect("grpc-meta-client", Default::default())
2407 .await?
2408 .wrapped();
2409
2410 Ok(channel)
2411 }
2412
2413 pub(crate) fn retry_strategy_to_bound(
2414 high_bound: Duration,
2415 exceed: bool,
2416 ) -> impl Iterator<Item = Duration> {
2417 let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2418 .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2419 .map(jitter);
2420
2421 let mut sum = Duration::default();
2422
2423 iter.take_while(move |duration| {
2424 sum += *duration;
2425
2426 if exceed {
2427 sum < high_bound + *duration
2428 } else {
2429 sum < high_bound
2430 }
2431 })
2432 }
2433}
2434
2435macro_rules! for_all_meta_rpc {
2436 ($macro:ident) => {
2437 $macro! {
2438 { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2439 ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2440 ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2441 ,{ cluster_client, update_worker_node_schedulability, UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse }
2442 ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2443 ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2444 ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2445 ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2446 ,{ stream_client, flush, FlushRequest, FlushResponse }
2447 ,{ stream_client, pause, PauseRequest, PauseResponse }
2448 ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2449 ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2450 ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2451 ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2452 ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2453 ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2454 ,{ stream_client, list_creating_fragment_distribution, ListCreatingFragmentDistributionRequest, ListCreatingFragmentDistributionResponse }
2455 ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2456 ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2457 ,{ stream_client, list_object_dependencies, ListObjectDependenciesRequest, ListObjectDependenciesResponse }
2458 ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2459 ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2460 ,{ stream_client, list_cdc_progress, ListCdcProgressRequest, ListCdcProgressResponse }
2461 ,{ stream_client, alter_connector_props, AlterConnectorPropsRequest, AlterConnectorPropsResponse }
2462 ,{ stream_client, get_fragment_by_id, GetFragmentByIdRequest, GetFragmentByIdResponse }
2463 ,{ stream_client, get_fragment_vnodes, GetFragmentVnodesRequest, GetFragmentVnodesResponse }
2464 ,{ stream_client, get_actor_vnodes, GetActorVnodesRequest, GetActorVnodesResponse }
2465 ,{ stream_client, set_sync_log_store_aligned, SetSyncLogStoreAlignedRequest, SetSyncLogStoreAlignedResponse }
2466 ,{ stream_client, refresh, RefreshRequest, RefreshResponse }
2467 ,{ stream_client, list_unmigrated_tables, ListUnmigratedTablesRequest, ListUnmigratedTablesResponse }
2468 ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2469 ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2470 ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2471 ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2472 ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2473 ,{ ddl_client, alter_fragment_parallelism, AlterFragmentParallelismRequest, AlterFragmentParallelismResponse }
2474 ,{ ddl_client, alter_cdc_table_backfill_parallelism, AlterCdcTableBackfillParallelismRequest, AlterCdcTableBackfillParallelismResponse }
2475 ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2476 ,{ ddl_client, alter_database_param, AlterDatabaseParamRequest, AlterDatabaseParamResponse }
2477 ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2478 ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2479 ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2480 ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2481 ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2482 ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2483 ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2484 ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2485 ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2486 ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2487 ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2488 ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2489 ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2490 ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2491 ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2492 ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2493 ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2494 ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2495 ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2496 ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2497 ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2498 ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2499 ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2500 ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2501 ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2502 ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2503 ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2504 ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2505 ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2506 ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2507 ,{ ddl_client, wait, WaitRequest, WaitResponse }
2508 ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2509 ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2510 ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2511 ,{ ddl_client, compact_iceberg_table, CompactIcebergTableRequest, CompactIcebergTableResponse }
2512 ,{ ddl_client, expire_iceberg_table_snapshots, ExpireIcebergTableSnapshotsRequest, ExpireIcebergTableSnapshotsResponse }
2513 ,{ ddl_client, create_iceberg_table, CreateIcebergTableRequest, CreateIcebergTableResponse }
2514 ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2515 ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2516 ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2517 ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2518 ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2519 ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2520 ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2521 ,{ hummock_client, get_new_object_ids, GetNewObjectIdsRequest, GetNewObjectIdsResponse }
2522 ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2523 ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2524 ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2525 ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2526 ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2527 ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2528 ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2529 ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2530 ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2531 ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2532 ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2533 ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2534 ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2535 ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2536 ,{ hummock_client, subscribe_iceberg_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeIcebergCompactionEventRequest>, Streaming<SubscribeIcebergCompactionEventResponse> }
2537 ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2538 ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2539 ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2540 ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2541 ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2542 ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2543 ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2544 ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2545 ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2546 ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2547 ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2548 ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2549 ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2550 ,{ user_client, alter_default_privilege, AlterDefaultPrivilegeRequest, AlterDefaultPrivilegeResponse }
2551 ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2552 ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2553 ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2554 ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2555 ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2556 ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2557 ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2558 ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2559 ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2560 ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2561 ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2562 ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2563 ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2564 ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2565 ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2566 ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2567 ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2568 ,{ hosted_iceberg_catalog_service_client, list_iceberg_tables, ListIcebergTablesRequest, ListIcebergTablesResponse }
2569 ,{ monitor_client, stack_trace, StackTraceRequest, StackTraceResponse }
2570 }
2571 };
2572}
2573
2574impl GrpcMetaClient {
2575 async fn refresh_client_if_needed(&self, code: Code) {
2576 if matches!(
2577 code,
2578 Code::Unknown | Code::Unimplemented | Code::Unavailable
2579 ) {
2580 tracing::debug!("matching tonic code {}", code);
2581 let (result_sender, result_receiver) = oneshot::channel();
2582 if self
2583 .member_monitor_event_sender
2584 .try_send(result_sender)
2585 .is_ok()
2586 {
2587 if let Ok(Err(e)) = result_receiver.await {
2588 tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2589 }
2590 } else {
2591 tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2592 }
2593 }
2594 }
2595}
2596
2597impl GrpcMetaClient {
2598 for_all_meta_rpc! { meta_rpc_client_method_impl }
2599}