1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{
33 AlterDatabaseParam, FunctionId, IndexId, ObjectId, SecretId, TableId,
34};
35use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
36use risingwave_common::hash::WorkerSlotMapping;
37use risingwave_common::monitor::EndpointExt;
38use risingwave_common::system_param::reader::SystemParamsReader;
39use risingwave_common::telemetry::report::TelemetryInfoFetcher;
40use risingwave_common::util::addr::HostAddr;
41use risingwave_common::util::meta_addr::MetaAddressStrategy;
42use risingwave_common::util::resource_util::cpu::total_cpu_available;
43use risingwave_common::util::resource_util::hostname;
44use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
45use risingwave_error::bail;
46use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
47use risingwave_hummock_sdk::compaction_group::StateTableId;
48use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
49use risingwave_hummock_sdk::{
50 CompactionGroupId, HummockEpoch, HummockVersionId, ObjectIdRange, SyncResult,
51};
52use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
53use risingwave_pb::backup_service::*;
54use risingwave_pb::catalog::{
55 Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
56 PbSubscription, PbTable, PbView, Table,
57};
58use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
59use risingwave_pb::cloud_service::*;
60use risingwave_pb::common::worker_node::Property;
61use risingwave_pb::common::{HostAddress, OptionalUint32, OptionalUint64, WorkerNode, WorkerType};
62use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
63use risingwave_pb::ddl_service::alter_owner_request::Object;
64use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
65use risingwave_pb::ddl_service::create_materialized_view_request::PbBackfillType;
66use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
67use risingwave_pb::ddl_service::drop_table_request::SourceId;
68use risingwave_pb::ddl_service::*;
69use risingwave_pb::hummock::compact_task::TaskStatus;
70use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
71use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
72use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
73use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
74use risingwave_pb::hummock::write_limits::WriteLimit;
75use risingwave_pb::hummock::*;
76use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Register as IcebergRegister;
77use risingwave_pb::iceberg_compaction::{
78 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
79 subscribe_iceberg_compaction_event_request,
80};
81use risingwave_pb::meta::alter_connector_props_request::{
82 AlterConnectorPropsObject, AlterIcebergTableIds, ExtraOptions,
83};
84use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
85use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
86use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
87use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
88use risingwave_pb::meta::hosted_iceberg_catalog_service_client::HostedIcebergCatalogServiceClient;
89use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
90use risingwave_pb::meta::list_actor_states_response::ActorState;
91use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
92use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
93use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
94use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
95use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
96use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
97use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
98use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
99use risingwave_pb::meta::serving_service_client::ServingServiceClient;
100use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
101use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
102use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
103use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
104use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
105use risingwave_pb::meta::{FragmentDistribution, *};
106use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
107use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
108use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
109use risingwave_pb::secret::PbSecretRef;
110use risingwave_pb::stream_plan::StreamFragmentGraph;
111use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
112use risingwave_pb::user::update_user_request::UpdateField;
113use risingwave_pb::user::user_service_client::UserServiceClient;
114use risingwave_pb::user::*;
115use thiserror_ext::AsReport;
116use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
117use tokio::sync::oneshot::Sender;
118use tokio::sync::{RwLock, mpsc, oneshot};
119use tokio::task::JoinHandle;
120use tokio::time::{self};
121use tokio_retry::strategy::{ExponentialBackoff, jitter};
122use tokio_stream::wrappers::UnboundedReceiverStream;
123use tonic::transport::Endpoint;
124use tonic::{Code, Request, Streaming};
125
126use crate::channel::{Channel, WrappedChannelExt};
127use crate::error::{Result, RpcError};
128use crate::hummock_meta_client::{
129 CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
130 IcebergCompactionEventItem,
131};
132use crate::meta_rpc_client_method_impl;
133
134type ConnectionId = u32;
135type DatabaseId = u32;
136type SchemaId = u32;
137
138#[derive(Clone, Debug)]
140pub struct MetaClient {
141 worker_id: u32,
142 worker_type: WorkerType,
143 host_addr: HostAddr,
144 inner: GrpcMetaClient,
145 meta_config: Arc<MetaConfig>,
146 cluster_id: String,
147 shutting_down: Arc<AtomicBool>,
148}
149
150impl MetaClient {
151 pub fn worker_id(&self) -> u32 {
152 self.worker_id
153 }
154
155 pub fn host_addr(&self) -> &HostAddr {
156 &self.host_addr
157 }
158
159 pub fn worker_type(&self) -> WorkerType {
160 self.worker_type
161 }
162
163 pub fn cluster_id(&self) -> &str {
164 &self.cluster_id
165 }
166
167 pub async fn subscribe(
169 &self,
170 subscribe_type: SubscribeType,
171 ) -> Result<Streaming<SubscribeResponse>> {
172 let request = SubscribeRequest {
173 subscribe_type: subscribe_type as i32,
174 host: Some(self.host_addr.to_protobuf()),
175 worker_id: self.worker_id(),
176 };
177
178 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
179 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
180 true,
181 );
182
183 tokio_retry::Retry::spawn(retry_strategy, || async {
184 let request = request.clone();
185 self.inner.subscribe(request).await
186 })
187 .await
188 }
189
190 pub async fn create_connection(
191 &self,
192 connection_name: String,
193 database_id: u32,
194 schema_id: u32,
195 owner_id: u32,
196 req: create_connection_request::Payload,
197 ) -> Result<WaitVersion> {
198 let request = CreateConnectionRequest {
199 name: connection_name,
200 database_id,
201 schema_id,
202 owner_id,
203 payload: Some(req),
204 };
205 let resp = self.inner.create_connection(request).await?;
206 Ok(resp
207 .version
208 .ok_or_else(|| anyhow!("wait version not set"))?)
209 }
210
211 pub async fn create_secret(
212 &self,
213 secret_name: String,
214 database_id: u32,
215 schema_id: u32,
216 owner_id: u32,
217 value: Vec<u8>,
218 ) -> Result<WaitVersion> {
219 let request = CreateSecretRequest {
220 name: secret_name,
221 database_id,
222 schema_id,
223 owner_id,
224 value,
225 };
226 let resp = self.inner.create_secret(request).await?;
227 Ok(resp
228 .version
229 .ok_or_else(|| anyhow!("wait version not set"))?)
230 }
231
232 pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
233 let request = ListConnectionsRequest {};
234 let resp = self.inner.list_connections(request).await?;
235 Ok(resp.connections)
236 }
237
238 pub async fn drop_connection(
239 &self,
240 connection_id: ConnectionId,
241 cascade: bool,
242 ) -> Result<WaitVersion> {
243 let request = DropConnectionRequest {
244 connection_id,
245 cascade,
246 };
247 let resp = self.inner.drop_connection(request).await?;
248 Ok(resp
249 .version
250 .ok_or_else(|| anyhow!("wait version not set"))?)
251 }
252
253 pub async fn drop_secret(&self, secret_id: SecretId) -> Result<WaitVersion> {
254 let request = DropSecretRequest {
255 secret_id: secret_id.into(),
256 };
257 let resp = self.inner.drop_secret(request).await?;
258 Ok(resp
259 .version
260 .ok_or_else(|| anyhow!("wait version not set"))?)
261 }
262
263 pub async fn register_new(
267 addr_strategy: MetaAddressStrategy,
268 worker_type: WorkerType,
269 addr: &HostAddr,
270 property: Property,
271 meta_config: Arc<MetaConfig>,
272 ) -> (Self, SystemParamsReader) {
273 let ret =
274 Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
275
276 match ret {
277 Ok(ret) => ret,
278 Err(err) => {
279 tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
280 std::process::exit(1);
281 }
282 }
283 }
284
285 async fn register_new_inner(
286 addr_strategy: MetaAddressStrategy,
287 worker_type: WorkerType,
288 addr: &HostAddr,
289 property: Property,
290 meta_config: Arc<MetaConfig>,
291 ) -> Result<(Self, SystemParamsReader)> {
292 tracing::info!("register meta client using strategy: {}", addr_strategy);
293
294 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
296 Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
297 true,
298 );
299
300 if property.is_unschedulable {
301 tracing::warn!("worker {:?} registered as unschedulable", addr.clone());
302 }
303 let init_result: Result<_> = tokio_retry::RetryIf::spawn(
304 retry_strategy,
305 || async {
306 let grpc_meta_client =
307 GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
308
309 let add_worker_resp = grpc_meta_client
310 .add_worker_node(AddWorkerNodeRequest {
311 worker_type: worker_type as i32,
312 host: Some(addr.to_protobuf()),
313 property: Some(property.clone()),
314 resource: Some(risingwave_pb::common::worker_node::Resource {
315 rw_version: RW_VERSION.to_owned(),
316 total_memory_bytes: system_memory_available_bytes() as _,
317 total_cpu_cores: total_cpu_available() as _,
318 hostname: hostname(),
319 }),
320 })
321 .await
322 .context("failed to add worker node")?;
323
324 let system_params_resp = grpc_meta_client
325 .get_system_params(GetSystemParamsRequest {})
326 .await
327 .context("failed to get initial system params")?;
328
329 Ok((add_worker_resp, system_params_resp, grpc_meta_client))
330 },
331 |e: &RpcError| !e.is_from_tonic_server_impl(),
334 )
335 .await;
336
337 let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
338 let worker_id = add_worker_resp
339 .node_id
340 .expect("AddWorkerNodeResponse::node_id is empty");
341
342 let meta_client = Self {
343 worker_id,
344 worker_type,
345 host_addr: addr.clone(),
346 inner: grpc_meta_client,
347 meta_config: meta_config.clone(),
348 cluster_id: add_worker_resp.cluster_id,
349 shutting_down: Arc::new(false.into()),
350 };
351
352 static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
353 REPORT_PANIC.call_once(|| {
354 let meta_client_clone = meta_client.clone();
355 std::panic::update_hook(move |default_hook, info| {
356 meta_client_clone.try_add_panic_event_blocking(info, None);
358 default_hook(info);
359 });
360 });
361
362 Ok((meta_client, system_params_resp.params.unwrap().into()))
363 }
364
365 pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
367 let request = ActivateWorkerNodeRequest {
368 host: Some(addr.to_protobuf()),
369 node_id: self.worker_id,
370 };
371 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
372 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
373 true,
374 );
375 tokio_retry::Retry::spawn(retry_strategy, || async {
376 let request = request.clone();
377 self.inner.activate_worker_node(request).await
378 })
379 .await?;
380
381 Ok(())
382 }
383
384 pub async fn send_heartbeat(&self, node_id: u32) -> Result<()> {
386 let request = HeartbeatRequest { node_id };
387 let resp = self.inner.heartbeat(request).await?;
388 if let Some(status) = resp.status
389 && status.code() == risingwave_pb::common::status::Code::UnknownWorker
390 {
391 if !self.shutting_down.load(Relaxed) {
394 tracing::error!(message = status.message, "worker expired");
395 std::process::exit(1);
396 }
397 }
398 Ok(())
399 }
400
401 pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
402 let request = CreateDatabaseRequest { db: Some(db) };
403 let resp = self.inner.create_database(request).await?;
404 Ok(resp
406 .version
407 .ok_or_else(|| anyhow!("wait version not set"))?)
408 }
409
410 pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
411 let request = CreateSchemaRequest {
412 schema: Some(schema),
413 };
414 let resp = self.inner.create_schema(request).await?;
415 Ok(resp
417 .version
418 .ok_or_else(|| anyhow!("wait version not set"))?)
419 }
420
421 pub async fn create_materialized_view(
422 &self,
423 table: PbTable,
424 graph: StreamFragmentGraph,
425 dependencies: HashSet<ObjectId>,
426 specific_resource_group: Option<String>,
427 if_not_exists: bool,
428 ) -> Result<WaitVersion> {
429 let request = CreateMaterializedViewRequest {
430 materialized_view: Some(table),
431 fragment_graph: Some(graph),
432 backfill: PbBackfillType::Regular as _,
433 dependencies: dependencies.into_iter().collect(),
434 specific_resource_group,
435 if_not_exists,
436 };
437 let resp = self.inner.create_materialized_view(request).await?;
438 Ok(resp
440 .version
441 .ok_or_else(|| anyhow!("wait version not set"))?)
442 }
443
444 pub async fn drop_materialized_view(
445 &self,
446 table_id: TableId,
447 cascade: bool,
448 ) -> Result<WaitVersion> {
449 let request = DropMaterializedViewRequest {
450 table_id: table_id.table_id(),
451 cascade,
452 };
453
454 let resp = self.inner.drop_materialized_view(request).await?;
455 Ok(resp
456 .version
457 .ok_or_else(|| anyhow!("wait version not set"))?)
458 }
459
460 pub async fn create_source(
461 &self,
462 source: PbSource,
463 graph: Option<StreamFragmentGraph>,
464 if_not_exists: bool,
465 ) -> Result<WaitVersion> {
466 let request = CreateSourceRequest {
467 source: Some(source),
468 fragment_graph: graph,
469 if_not_exists,
470 };
471
472 let resp = self.inner.create_source(request).await?;
473 Ok(resp
474 .version
475 .ok_or_else(|| anyhow!("wait version not set"))?)
476 }
477
478 pub async fn create_sink(
479 &self,
480 sink: PbSink,
481 graph: StreamFragmentGraph,
482 dependencies: HashSet<ObjectId>,
483 if_not_exists: bool,
484 ) -> Result<WaitVersion> {
485 let request = CreateSinkRequest {
486 sink: Some(sink),
487 fragment_graph: Some(graph),
488 dependencies: dependencies.into_iter().collect(),
489 if_not_exists,
490 };
491
492 let resp = self.inner.create_sink(request).await?;
493 Ok(resp
494 .version
495 .ok_or_else(|| anyhow!("wait version not set"))?)
496 }
497
498 pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
499 let request = CreateSubscriptionRequest {
500 subscription: Some(subscription),
501 };
502
503 let resp = self.inner.create_subscription(request).await?;
504 Ok(resp
505 .version
506 .ok_or_else(|| anyhow!("wait version not set"))?)
507 }
508
509 pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
510 let request = CreateFunctionRequest {
511 function: Some(function),
512 };
513 let resp = self.inner.create_function(request).await?;
514 Ok(resp
515 .version
516 .ok_or_else(|| anyhow!("wait version not set"))?)
517 }
518
519 pub async fn create_table(
520 &self,
521 source: Option<PbSource>,
522 table: PbTable,
523 graph: StreamFragmentGraph,
524 job_type: PbTableJobType,
525 if_not_exists: bool,
526 dependencies: HashSet<ObjectId>,
527 ) -> Result<WaitVersion> {
528 let request = CreateTableRequest {
529 materialized_view: Some(table),
530 fragment_graph: Some(graph),
531 source,
532 job_type: job_type as _,
533 if_not_exists,
534 dependencies: dependencies.into_iter().collect(),
535 };
536 let resp = self.inner.create_table(request).await?;
537 Ok(resp
539 .version
540 .ok_or_else(|| anyhow!("wait version not set"))?)
541 }
542
543 pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
544 let request = CommentOnRequest {
545 comment: Some(comment),
546 };
547 let resp = self.inner.comment_on(request).await?;
548 Ok(resp
549 .version
550 .ok_or_else(|| anyhow!("wait version not set"))?)
551 }
552
553 pub async fn alter_name(
554 &self,
555 object: alter_name_request::Object,
556 name: &str,
557 ) -> Result<WaitVersion> {
558 let request = AlterNameRequest {
559 object: Some(object),
560 new_name: name.to_owned(),
561 };
562 let resp = self.inner.alter_name(request).await?;
563 Ok(resp
564 .version
565 .ok_or_else(|| anyhow!("wait version not set"))?)
566 }
567
568 pub async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<WaitVersion> {
569 let request = AlterOwnerRequest {
570 object: Some(object),
571 owner_id,
572 };
573 let resp = self.inner.alter_owner(request).await?;
574 Ok(resp
575 .version
576 .ok_or_else(|| anyhow!("wait version not set"))?)
577 }
578
579 pub async fn alter_database_param(
580 &self,
581 database_id: DatabaseId,
582 param: AlterDatabaseParam,
583 ) -> Result<WaitVersion> {
584 let request = match param {
585 AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms) => {
586 let barrier_interval_ms = OptionalUint32 {
587 value: barrier_interval_ms,
588 };
589 AlterDatabaseParamRequest {
590 database_id,
591 param: Some(alter_database_param_request::Param::BarrierIntervalMs(
592 barrier_interval_ms,
593 )),
594 }
595 }
596 AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency) => {
597 let checkpoint_frequency = OptionalUint64 {
598 value: checkpoint_frequency,
599 };
600 AlterDatabaseParamRequest {
601 database_id,
602 param: Some(alter_database_param_request::Param::CheckpointFrequency(
603 checkpoint_frequency,
604 )),
605 }
606 }
607 };
608 let resp = self.inner.alter_database_param(request).await?;
609 Ok(resp
610 .version
611 .ok_or_else(|| anyhow!("wait version not set"))?)
612 }
613
614 pub async fn alter_set_schema(
615 &self,
616 object: alter_set_schema_request::Object,
617 new_schema_id: u32,
618 ) -> Result<WaitVersion> {
619 let request = AlterSetSchemaRequest {
620 new_schema_id,
621 object: Some(object),
622 };
623 let resp = self.inner.alter_set_schema(request).await?;
624 Ok(resp
625 .version
626 .ok_or_else(|| anyhow!("wait version not set"))?)
627 }
628
629 pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
630 let request = AlterSourceRequest {
631 source: Some(source),
632 };
633 let resp = self.inner.alter_source(request).await?;
634 Ok(resp
635 .version
636 .ok_or_else(|| anyhow!("wait version not set"))?)
637 }
638
639 pub async fn alter_parallelism(
640 &self,
641 table_id: u32,
642 parallelism: PbTableParallelism,
643 deferred: bool,
644 ) -> Result<()> {
645 let request = AlterParallelismRequest {
646 table_id,
647 parallelism: Some(parallelism),
648 deferred,
649 };
650
651 self.inner.alter_parallelism(request).await?;
652 Ok(())
653 }
654
655 pub async fn alter_cdc_table_backfill_parallelism(
656 &self,
657 table_id: u32,
658 parallelism: PbTableParallelism,
659 ) -> Result<()> {
660 let request = AlterCdcTableBackfillParallelismRequest {
661 table_id,
662 parallelism: Some(parallelism),
663 };
664 self.inner
665 .alter_cdc_table_backfill_parallelism(request)
666 .await?;
667 Ok(())
668 }
669
670 pub async fn alter_resource_group(
671 &self,
672 table_id: u32,
673 resource_group: Option<String>,
674 deferred: bool,
675 ) -> Result<()> {
676 let request = AlterResourceGroupRequest {
677 table_id,
678 resource_group,
679 deferred,
680 };
681
682 self.inner.alter_resource_group(request).await?;
683 Ok(())
684 }
685
686 pub async fn alter_swap_rename(
687 &self,
688 object: alter_swap_rename_request::Object,
689 ) -> Result<WaitVersion> {
690 let request = AlterSwapRenameRequest {
691 object: Some(object),
692 };
693 let resp = self.inner.alter_swap_rename(request).await?;
694 Ok(resp
695 .version
696 .ok_or_else(|| anyhow!("wait version not set"))?)
697 }
698
699 pub async fn alter_secret(
700 &self,
701 secret_id: u32,
702 secret_name: String,
703 database_id: u32,
704 schema_id: u32,
705 owner_id: u32,
706 value: Vec<u8>,
707 ) -> Result<WaitVersion> {
708 let request = AlterSecretRequest {
709 secret_id,
710 name: secret_name,
711 database_id,
712 schema_id,
713 owner_id,
714 value,
715 };
716 let resp = self.inner.alter_secret(request).await?;
717 Ok(resp
718 .version
719 .ok_or_else(|| anyhow!("wait version not set"))?)
720 }
721
722 pub async fn replace_job(
723 &self,
724 graph: StreamFragmentGraph,
725 replace_job: ReplaceJob,
726 ) -> Result<WaitVersion> {
727 let request = ReplaceJobPlanRequest {
728 plan: Some(ReplaceJobPlan {
729 fragment_graph: Some(graph),
730 replace_job: Some(replace_job),
731 }),
732 };
733 let resp = self.inner.replace_job_plan(request).await?;
734 Ok(resp
736 .version
737 .ok_or_else(|| anyhow!("wait version not set"))?)
738 }
739
740 pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
741 let request = AutoSchemaChangeRequest {
742 schema_change: Some(schema_change),
743 };
744 let _ = self.inner.auto_schema_change(request).await?;
745 Ok(())
746 }
747
748 pub async fn create_view(
749 &self,
750 view: PbView,
751 dependencies: HashSet<ObjectId>,
752 ) -> Result<WaitVersion> {
753 let request = CreateViewRequest {
754 view: Some(view),
755 dependencies: dependencies.into_iter().collect(),
756 };
757 let resp = self.inner.create_view(request).await?;
758 Ok(resp
760 .version
761 .ok_or_else(|| anyhow!("wait version not set"))?)
762 }
763
764 pub async fn create_index(
765 &self,
766 index: PbIndex,
767 table: PbTable,
768 graph: StreamFragmentGraph,
769 if_not_exists: bool,
770 ) -> Result<WaitVersion> {
771 let request = CreateIndexRequest {
772 index: Some(index),
773 index_table: Some(table),
774 fragment_graph: Some(graph),
775 if_not_exists,
776 };
777 let resp = self.inner.create_index(request).await?;
778 Ok(resp
780 .version
781 .ok_or_else(|| anyhow!("wait version not set"))?)
782 }
783
784 pub async fn drop_table(
785 &self,
786 source_id: Option<u32>,
787 table_id: TableId,
788 cascade: bool,
789 ) -> Result<WaitVersion> {
790 let request = DropTableRequest {
791 source_id: source_id.map(SourceId::Id),
792 table_id: table_id.table_id(),
793 cascade,
794 };
795
796 let resp = self.inner.drop_table(request).await?;
797 Ok(resp
798 .version
799 .ok_or_else(|| anyhow!("wait version not set"))?)
800 }
801
802 pub async fn compact_iceberg_table(&self, sink_id: u32) -> Result<u64> {
803 let request = CompactIcebergTableRequest { sink_id };
804 let resp = self.inner.compact_iceberg_table(request).await?;
805 Ok(resp.task_id)
806 }
807
808 pub async fn expire_iceberg_table_snapshots(&self, sink_id: u32) -> Result<()> {
809 let request = ExpireIcebergTableSnapshotsRequest { sink_id };
810 let _resp = self.inner.expire_iceberg_table_snapshots(request).await?;
811 Ok(())
812 }
813
814 pub async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<WaitVersion> {
815 let request = DropViewRequest { view_id, cascade };
816 let resp = self.inner.drop_view(request).await?;
817 Ok(resp
818 .version
819 .ok_or_else(|| anyhow!("wait version not set"))?)
820 }
821
822 pub async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<WaitVersion> {
823 let request = DropSourceRequest { source_id, cascade };
824 let resp = self.inner.drop_source(request).await?;
825 Ok(resp
826 .version
827 .ok_or_else(|| anyhow!("wait version not set"))?)
828 }
829
830 pub async fn drop_sink(&self, sink_id: u32, cascade: bool) -> Result<WaitVersion> {
831 let request = DropSinkRequest { sink_id, cascade };
832 let resp = self.inner.drop_sink(request).await?;
833 Ok(resp
834 .version
835 .ok_or_else(|| anyhow!("wait version not set"))?)
836 }
837
838 pub async fn drop_subscription(
839 &self,
840 subscription_id: u32,
841 cascade: bool,
842 ) -> Result<WaitVersion> {
843 let request = DropSubscriptionRequest {
844 subscription_id,
845 cascade,
846 };
847 let resp = self.inner.drop_subscription(request).await?;
848 Ok(resp
849 .version
850 .ok_or_else(|| anyhow!("wait version not set"))?)
851 }
852
853 pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
854 let request = DropIndexRequest {
855 index_id: index_id.index_id,
856 cascade,
857 };
858 let resp = self.inner.drop_index(request).await?;
859 Ok(resp
860 .version
861 .ok_or_else(|| anyhow!("wait version not set"))?)
862 }
863
864 pub async fn drop_function(
865 &self,
866 function_id: FunctionId,
867 cascade: bool,
868 ) -> Result<WaitVersion> {
869 let request = DropFunctionRequest {
870 function_id: function_id.0,
871 cascade,
872 };
873 let resp = self.inner.drop_function(request).await?;
874 Ok(resp
875 .version
876 .ok_or_else(|| anyhow!("wait version not set"))?)
877 }
878
879 pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
880 let request = DropDatabaseRequest { database_id };
881 let resp = self.inner.drop_database(request).await?;
882 Ok(resp
883 .version
884 .ok_or_else(|| anyhow!("wait version not set"))?)
885 }
886
887 pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
888 let request = DropSchemaRequest { schema_id, cascade };
889 let resp = self.inner.drop_schema(request).await?;
890 Ok(resp
891 .version
892 .ok_or_else(|| anyhow!("wait version not set"))?)
893 }
894
895 pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
897 let request = CreateUserRequest { user: Some(user) };
898 let resp = self.inner.create_user(request).await?;
899 Ok(resp.version)
900 }
901
902 pub async fn drop_user(&self, user_id: u32) -> Result<u64> {
903 let request = DropUserRequest { user_id };
904 let resp = self.inner.drop_user(request).await?;
905 Ok(resp.version)
906 }
907
908 pub async fn update_user(
909 &self,
910 user: UserInfo,
911 update_fields: Vec<UpdateField>,
912 ) -> Result<u64> {
913 let request = UpdateUserRequest {
914 user: Some(user),
915 update_fields: update_fields
916 .into_iter()
917 .map(|field| field as i32)
918 .collect::<Vec<_>>(),
919 };
920 let resp = self.inner.update_user(request).await?;
921 Ok(resp.version)
922 }
923
924 pub async fn grant_privilege(
925 &self,
926 user_ids: Vec<u32>,
927 privileges: Vec<GrantPrivilege>,
928 with_grant_option: bool,
929 granted_by: u32,
930 ) -> Result<u64> {
931 let request = GrantPrivilegeRequest {
932 user_ids,
933 privileges,
934 with_grant_option,
935 granted_by,
936 };
937 let resp = self.inner.grant_privilege(request).await?;
938 Ok(resp.version)
939 }
940
941 pub async fn revoke_privilege(
942 &self,
943 user_ids: Vec<u32>,
944 privileges: Vec<GrantPrivilege>,
945 granted_by: u32,
946 revoke_by: u32,
947 revoke_grant_option: bool,
948 cascade: bool,
949 ) -> Result<u64> {
950 let request = RevokePrivilegeRequest {
951 user_ids,
952 privileges,
953 granted_by,
954 revoke_by,
955 revoke_grant_option,
956 cascade,
957 };
958 let resp = self.inner.revoke_privilege(request).await?;
959 Ok(resp.version)
960 }
961
962 pub async fn alter_default_privilege(
963 &self,
964 user_ids: Vec<u32>,
965 database_id: DatabaseId,
966 schema_ids: Vec<SchemaId>,
967 operation: AlterDefaultPrivilegeOperation,
968 granted_by: u32,
969 ) -> Result<()> {
970 let request = AlterDefaultPrivilegeRequest {
971 user_ids,
972 database_id,
973 schema_ids,
974 operation: Some(operation),
975 granted_by,
976 };
977 self.inner.alter_default_privilege(request).await?;
978 Ok(())
979 }
980
981 pub async fn unregister(&self) -> Result<()> {
983 let request = DeleteWorkerNodeRequest {
984 host: Some(self.host_addr.to_protobuf()),
985 };
986 self.inner.delete_worker_node(request).await?;
987 self.shutting_down.store(true, Relaxed);
988 Ok(())
989 }
990
991 pub async fn try_unregister(&self) {
993 match self.unregister().await {
994 Ok(_) => {
995 tracing::info!(
996 worker_id = self.worker_id(),
997 "successfully unregistered from meta service",
998 )
999 }
1000 Err(e) => {
1001 tracing::warn!(
1002 error = %e.as_report(),
1003 worker_id = self.worker_id(),
1004 "failed to unregister from meta service",
1005 );
1006 }
1007 }
1008 }
1009
1010 pub async fn update_schedulability(
1011 &self,
1012 worker_ids: &[u32],
1013 schedulability: Schedulability,
1014 ) -> Result<UpdateWorkerNodeSchedulabilityResponse> {
1015 let request = UpdateWorkerNodeSchedulabilityRequest {
1016 worker_ids: worker_ids.to_vec(),
1017 schedulability: schedulability.into(),
1018 };
1019 let resp = self
1020 .inner
1021 .update_worker_node_schedulability(request)
1022 .await?;
1023 Ok(resp)
1024 }
1025
1026 pub async fn list_worker_nodes(
1027 &self,
1028 worker_type: Option<WorkerType>,
1029 ) -> Result<Vec<WorkerNode>> {
1030 let request = ListAllNodesRequest {
1031 worker_type: worker_type.map(Into::into),
1032 include_starting_nodes: true,
1033 };
1034 let resp = self.inner.list_all_nodes(request).await?;
1035 Ok(resp.nodes)
1036 }
1037
1038 pub fn start_heartbeat_loop(
1040 meta_client: MetaClient,
1041 min_interval: Duration,
1042 ) -> (JoinHandle<()>, Sender<()>) {
1043 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1044 let join_handle = tokio::spawn(async move {
1045 let mut min_interval_ticker = tokio::time::interval(min_interval);
1046 loop {
1047 tokio::select! {
1048 biased;
1049 _ = &mut shutdown_rx => {
1051 tracing::info!("Heartbeat loop is stopped");
1052 return;
1053 }
1054 _ = min_interval_ticker.tick() => {},
1056 }
1057 tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
1058 match tokio::time::timeout(
1059 min_interval * 3,
1061 meta_client.send_heartbeat(meta_client.worker_id()),
1062 )
1063 .await
1064 {
1065 Ok(Ok(_)) => {}
1066 Ok(Err(err)) => {
1067 tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
1068 }
1069 Err(_) => {
1070 tracing::warn!("Failed to send_heartbeat: timeout");
1071 }
1072 }
1073 }
1074 });
1075 (join_handle, shutdown_tx)
1076 }
1077
1078 pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
1079 let request = RisectlListStateTablesRequest {};
1080 let resp = self.inner.risectl_list_state_tables(request).await?;
1081 Ok(resp.tables)
1082 }
1083
1084 pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
1085 let request = FlushRequest { database_id };
1086 let resp = self.inner.flush(request).await?;
1087 Ok(HummockVersionId::new(resp.hummock_version_id))
1088 }
1089
1090 pub async fn wait(&self) -> Result<()> {
1091 let request = WaitRequest {};
1092 self.inner.wait(request).await?;
1093 Ok(())
1094 }
1095
1096 pub async fn recover(&self) -> Result<()> {
1097 let request = RecoverRequest {};
1098 self.inner.recover(request).await?;
1099 Ok(())
1100 }
1101
1102 pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
1103 let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
1104 let resp = self.inner.cancel_creating_jobs(request).await?;
1105 Ok(resp.canceled_jobs)
1106 }
1107
1108 pub async fn list_table_fragments(
1109 &self,
1110 table_ids: &[u32],
1111 ) -> Result<HashMap<u32, TableFragmentInfo>> {
1112 let request = ListTableFragmentsRequest {
1113 table_ids: table_ids.to_vec(),
1114 };
1115 let resp = self.inner.list_table_fragments(request).await?;
1116 Ok(resp.table_fragments)
1117 }
1118
1119 pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1120 let resp = self
1121 .inner
1122 .list_streaming_job_states(ListStreamingJobStatesRequest {})
1123 .await?;
1124 Ok(resp.states)
1125 }
1126
1127 pub async fn list_fragment_distributions(&self) -> Result<Vec<FragmentDistribution>> {
1128 let resp = self
1129 .inner
1130 .list_fragment_distribution(ListFragmentDistributionRequest {})
1131 .await?;
1132 Ok(resp.distributions)
1133 }
1134
1135 pub async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
1136 let resp = self
1137 .inner
1138 .list_creating_fragment_distribution(ListCreatingFragmentDistributionRequest {})
1139 .await?;
1140 Ok(resp.distributions)
1141 }
1142
1143 pub async fn get_fragment_by_id(
1144 &self,
1145 fragment_id: u32,
1146 ) -> Result<Option<FragmentDistribution>> {
1147 let resp = self
1148 .inner
1149 .get_fragment_by_id(GetFragmentByIdRequest { fragment_id })
1150 .await?;
1151 Ok(resp.distribution)
1152 }
1153
1154 pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1155 let resp = self
1156 .inner
1157 .list_actor_states(ListActorStatesRequest {})
1158 .await?;
1159 Ok(resp.states)
1160 }
1161
1162 pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1163 let resp = self
1164 .inner
1165 .list_actor_splits(ListActorSplitsRequest {})
1166 .await?;
1167
1168 Ok(resp.actor_splits)
1169 }
1170
1171 pub async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
1172 let resp = self
1173 .inner
1174 .list_object_dependencies(ListObjectDependenciesRequest {})
1175 .await?;
1176 Ok(resp.dependencies)
1177 }
1178
1179 pub async fn pause(&self) -> Result<PauseResponse> {
1180 let request = PauseRequest {};
1181 let resp = self.inner.pause(request).await?;
1182 Ok(resp)
1183 }
1184
1185 pub async fn resume(&self) -> Result<ResumeResponse> {
1186 let request = ResumeRequest {};
1187 let resp = self.inner.resume(request).await?;
1188 Ok(resp)
1189 }
1190
1191 pub async fn apply_throttle(
1192 &self,
1193 kind: PbThrottleTarget,
1194 id: u32,
1195 rate: Option<u32>,
1196 ) -> Result<ApplyThrottleResponse> {
1197 let request = ApplyThrottleRequest {
1198 kind: kind as i32,
1199 id,
1200 rate,
1201 };
1202 let resp = self.inner.apply_throttle(request).await?;
1203 Ok(resp)
1204 }
1205
1206 pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1207 let resp = self
1208 .inner
1209 .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1210 .await?;
1211 Ok(resp.get_status().unwrap())
1212 }
1213
1214 pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1215 let request = GetClusterInfoRequest {};
1216 let resp = self.inner.get_cluster_info(request).await?;
1217 Ok(resp)
1218 }
1219
1220 pub async fn reschedule(
1221 &self,
1222 worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1223 revision: u64,
1224 resolve_no_shuffle_upstream: bool,
1225 ) -> Result<(bool, u64)> {
1226 let request = RescheduleRequest {
1227 revision,
1228 resolve_no_shuffle_upstream,
1229 worker_reschedules,
1230 };
1231 let resp = self.inner.reschedule(request).await?;
1232 Ok((resp.success, resp.revision))
1233 }
1234
1235 pub async fn risectl_get_pinned_versions_summary(
1236 &self,
1237 ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1238 let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1239 self.inner
1240 .rise_ctl_get_pinned_versions_summary(request)
1241 .await
1242 }
1243
1244 pub async fn risectl_get_checkpoint_hummock_version(
1245 &self,
1246 ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1247 let request = RiseCtlGetCheckpointVersionRequest {};
1248 self.inner.rise_ctl_get_checkpoint_version(request).await
1249 }
1250
1251 pub async fn risectl_pause_hummock_version_checkpoint(
1252 &self,
1253 ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1254 let request = RiseCtlPauseVersionCheckpointRequest {};
1255 self.inner.rise_ctl_pause_version_checkpoint(request).await
1256 }
1257
1258 pub async fn risectl_resume_hummock_version_checkpoint(
1259 &self,
1260 ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1261 let request = RiseCtlResumeVersionCheckpointRequest {};
1262 self.inner.rise_ctl_resume_version_checkpoint(request).await
1263 }
1264
1265 pub async fn init_metadata_for_replay(
1266 &self,
1267 tables: Vec<PbTable>,
1268 compaction_groups: Vec<CompactionGroupInfo>,
1269 ) -> Result<()> {
1270 let req = InitMetadataForReplayRequest {
1271 tables,
1272 compaction_groups,
1273 };
1274 let _resp = self.inner.init_metadata_for_replay(req).await?;
1275 Ok(())
1276 }
1277
1278 pub async fn replay_version_delta(
1279 &self,
1280 version_delta: HummockVersionDelta,
1281 ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1282 let req = ReplayVersionDeltaRequest {
1283 version_delta: Some(version_delta.into()),
1284 };
1285 let resp = self.inner.replay_version_delta(req).await?;
1286 Ok((
1287 HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1288 resp.modified_compaction_groups,
1289 ))
1290 }
1291
1292 pub async fn list_version_deltas(
1293 &self,
1294 start_id: HummockVersionId,
1295 num_limit: u32,
1296 committed_epoch_limit: HummockEpoch,
1297 ) -> Result<Vec<HummockVersionDelta>> {
1298 let req = ListVersionDeltasRequest {
1299 start_id: start_id.to_u64(),
1300 num_limit,
1301 committed_epoch_limit,
1302 };
1303 Ok(self
1304 .inner
1305 .list_version_deltas(req)
1306 .await?
1307 .version_deltas
1308 .unwrap()
1309 .version_deltas
1310 .iter()
1311 .map(HummockVersionDelta::from_rpc_protobuf)
1312 .collect())
1313 }
1314
1315 pub async fn trigger_compaction_deterministic(
1316 &self,
1317 version_id: HummockVersionId,
1318 compaction_groups: Vec<CompactionGroupId>,
1319 ) -> Result<()> {
1320 let req = TriggerCompactionDeterministicRequest {
1321 version_id: version_id.to_u64(),
1322 compaction_groups,
1323 };
1324 self.inner.trigger_compaction_deterministic(req).await?;
1325 Ok(())
1326 }
1327
1328 pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1329 let req = DisableCommitEpochRequest {};
1330 Ok(HummockVersion::from_rpc_protobuf(
1331 &self
1332 .inner
1333 .disable_commit_epoch(req)
1334 .await?
1335 .current_version
1336 .unwrap(),
1337 ))
1338 }
1339
1340 pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1341 let req = GetAssignedCompactTaskNumRequest {};
1342 let resp = self.inner.get_assigned_compact_task_num(req).await?;
1343 Ok(resp.num_tasks as usize)
1344 }
1345
1346 pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1347 let req = RiseCtlListCompactionGroupRequest {};
1348 let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1349 Ok(resp.compaction_groups)
1350 }
1351
1352 pub async fn risectl_update_compaction_config(
1353 &self,
1354 compaction_groups: &[CompactionGroupId],
1355 configs: &[MutableConfig],
1356 ) -> Result<()> {
1357 let req = RiseCtlUpdateCompactionConfigRequest {
1358 compaction_group_ids: compaction_groups.to_vec(),
1359 configs: configs
1360 .iter()
1361 .map(
1362 |c| rise_ctl_update_compaction_config_request::MutableConfig {
1363 mutable_config: Some(c.clone()),
1364 },
1365 )
1366 .collect(),
1367 };
1368 let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1369 Ok(())
1370 }
1371
1372 pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1373 let req = BackupMetaRequest { remarks };
1374 let resp = self.inner.backup_meta(req).await?;
1375 Ok(resp.job_id)
1376 }
1377
1378 pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1379 let req = GetBackupJobStatusRequest { job_id };
1380 let resp = self.inner.get_backup_job_status(req).await?;
1381 Ok((resp.job_status(), resp.message))
1382 }
1383
1384 pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1385 let req = DeleteMetaSnapshotRequest {
1386 snapshot_ids: snapshot_ids.to_vec(),
1387 };
1388 let _resp = self.inner.delete_meta_snapshot(req).await?;
1389 Ok(())
1390 }
1391
1392 pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1393 let req = GetMetaSnapshotManifestRequest {};
1394 let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1395 Ok(resp.manifest.expect("should exist"))
1396 }
1397
1398 pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1399 let req = GetTelemetryInfoRequest {};
1400 let resp = self.inner.get_telemetry_info(req).await?;
1401 Ok(resp)
1402 }
1403
1404 pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1405 let req = GetMetaStoreInfoRequest {};
1406 let resp = self.inner.get_meta_store_info(req).await?;
1407 Ok(resp.meta_store_endpoint)
1408 }
1409
1410 pub async fn alter_sink_props(
1411 &self,
1412 sink_id: u32,
1413 changed_props: BTreeMap<String, String>,
1414 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1415 connector_conn_ref: Option<u32>,
1416 ) -> Result<()> {
1417 let req = AlterConnectorPropsRequest {
1418 object_id: sink_id,
1419 changed_props: changed_props.into_iter().collect(),
1420 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1421 connector_conn_ref,
1422 object_type: AlterConnectorPropsObject::Sink as i32,
1423 extra_options: None,
1424 };
1425 let _resp = self.inner.alter_connector_props(req).await?;
1426 Ok(())
1427 }
1428
1429 pub async fn alter_iceberg_table_props(
1430 &self,
1431 table_id: u32,
1432 sink_id: u32,
1433 source_id: u32,
1434 changed_props: BTreeMap<String, String>,
1435 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1436 connector_conn_ref: Option<u32>,
1437 ) -> Result<()> {
1438 let req = AlterConnectorPropsRequest {
1439 object_id: table_id,
1440 changed_props: changed_props.into_iter().collect(),
1441 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1442 connector_conn_ref,
1443 object_type: AlterConnectorPropsObject::IcebergTable as i32,
1444 extra_options: Some(ExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds {
1445 sink_id: sink_id as i32,
1446 source_id: source_id as i32,
1447 })),
1448 };
1449 let _resp = self.inner.alter_connector_props(req).await?;
1450 Ok(())
1451 }
1452
1453 pub async fn alter_source_connector_props(
1454 &self,
1455 source_id: u32,
1456 changed_props: BTreeMap<String, String>,
1457 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1458 connector_conn_ref: Option<u32>,
1459 ) -> Result<()> {
1460 let req = AlterConnectorPropsRequest {
1461 object_id: source_id,
1462 changed_props: changed_props.into_iter().collect(),
1463 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1464 connector_conn_ref,
1465 object_type: AlterConnectorPropsObject::Source as i32,
1466 extra_options: None,
1467 };
1468 let _resp = self.inner.alter_connector_props(req).await?;
1469 Ok(())
1470 }
1471
1472 pub async fn set_system_param(
1473 &self,
1474 param: String,
1475 value: Option<String>,
1476 ) -> Result<Option<SystemParamsReader>> {
1477 let req = SetSystemParamRequest { param, value };
1478 let resp = self.inner.set_system_param(req).await?;
1479 Ok(resp.params.map(SystemParamsReader::from))
1480 }
1481
1482 pub async fn get_session_params(&self) -> Result<String> {
1483 let req = GetSessionParamsRequest {};
1484 let resp = self.inner.get_session_params(req).await?;
1485 Ok(resp.params)
1486 }
1487
1488 pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1489 let req = SetSessionParamRequest { param, value };
1490 let resp = self.inner.set_session_param(req).await?;
1491 Ok(resp.param)
1492 }
1493
1494 pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1495 let req = GetDdlProgressRequest {};
1496 let resp = self.inner.get_ddl_progress(req).await?;
1497 Ok(resp.ddl_progress)
1498 }
1499
1500 pub async fn split_compaction_group(
1501 &self,
1502 group_id: CompactionGroupId,
1503 table_ids_to_new_group: &[StateTableId],
1504 partition_vnode_count: u32,
1505 ) -> Result<CompactionGroupId> {
1506 let req = SplitCompactionGroupRequest {
1507 group_id,
1508 table_ids: table_ids_to_new_group.to_vec(),
1509 partition_vnode_count,
1510 };
1511 let resp = self.inner.split_compaction_group(req).await?;
1512 Ok(resp.new_group_id)
1513 }
1514
1515 pub async fn get_tables(
1516 &self,
1517 table_ids: &[u32],
1518 include_dropped_tables: bool,
1519 ) -> Result<HashMap<u32, Table>> {
1520 let req = GetTablesRequest {
1521 table_ids: table_ids.to_vec(),
1522 include_dropped_tables,
1523 };
1524 let resp = self.inner.get_tables(req).await?;
1525 Ok(resp.tables)
1526 }
1527
1528 pub async fn list_serving_vnode_mappings(
1529 &self,
1530 ) -> Result<HashMap<u32, (u32, WorkerSlotMapping)>> {
1531 let req = GetServingVnodeMappingsRequest {};
1532 let resp = self.inner.get_serving_vnode_mappings(req).await?;
1533 let mappings = resp
1534 .worker_slot_mappings
1535 .into_iter()
1536 .map(|p| {
1537 (
1538 p.fragment_id,
1539 (
1540 resp.fragment_to_table
1541 .get(&p.fragment_id)
1542 .cloned()
1543 .unwrap_or(0),
1544 WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1545 ),
1546 )
1547 })
1548 .collect();
1549 Ok(mappings)
1550 }
1551
1552 pub async fn risectl_list_compaction_status(
1553 &self,
1554 ) -> Result<(
1555 Vec<CompactStatus>,
1556 Vec<CompactTaskAssignment>,
1557 Vec<CompactTaskProgress>,
1558 )> {
1559 let req = RiseCtlListCompactionStatusRequest {};
1560 let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1561 Ok((
1562 resp.compaction_statuses,
1563 resp.task_assignment,
1564 resp.task_progress,
1565 ))
1566 }
1567
1568 pub async fn get_compaction_score(
1569 &self,
1570 compaction_group_id: CompactionGroupId,
1571 ) -> Result<Vec<PickerInfo>> {
1572 let req = GetCompactionScoreRequest {
1573 compaction_group_id,
1574 };
1575 let resp = self.inner.get_compaction_score(req).await?;
1576 Ok(resp.scores)
1577 }
1578
1579 pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1580 let req = RiseCtlRebuildTableStatsRequest {};
1581 let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1582 Ok(())
1583 }
1584
1585 pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1586 let req = ListBranchedObjectRequest {};
1587 let resp = self.inner.list_branched_object(req).await?;
1588 Ok(resp.branched_objects)
1589 }
1590
1591 pub async fn list_active_write_limit(&self) -> Result<HashMap<u64, WriteLimit>> {
1592 let req = ListActiveWriteLimitRequest {};
1593 let resp = self.inner.list_active_write_limit(req).await?;
1594 Ok(resp.write_limits)
1595 }
1596
1597 pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1598 let req = ListHummockMetaConfigRequest {};
1599 let resp = self.inner.list_hummock_meta_config(req).await?;
1600 Ok(resp.configs)
1601 }
1602
1603 pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1604 let _resp = self
1605 .inner
1606 .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1607 .await?;
1608
1609 Ok(())
1610 }
1611
1612 pub async fn rw_cloud_validate_source(
1613 &self,
1614 source_type: SourceType,
1615 source_config: HashMap<String, String>,
1616 ) -> Result<RwCloudValidateSourceResponse> {
1617 let req = RwCloudValidateSourceRequest {
1618 source_type: source_type.into(),
1619 source_config,
1620 };
1621 let resp = self.inner.rw_cloud_validate_source(req).await?;
1622 Ok(resp)
1623 }
1624
1625 pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1626 self.inner.core.read().await.sink_coordinate_client.clone()
1627 }
1628
1629 pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1630 let req = ListCompactTaskAssignmentRequest {};
1631 let resp = self.inner.list_compact_task_assignment(req).await?;
1632 Ok(resp.task_assignment)
1633 }
1634
1635 pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1636 let req = ListEventLogRequest::default();
1637 let resp = self.inner.list_event_log(req).await?;
1638 Ok(resp.event_logs)
1639 }
1640
1641 pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1642 let req = ListCompactTaskProgressRequest {};
1643 let resp = self.inner.list_compact_task_progress(req).await?;
1644 Ok(resp.task_progress)
1645 }
1646
1647 #[cfg(madsim)]
1648 pub fn try_add_panic_event_blocking(
1649 &self,
1650 panic_info: impl Display,
1651 timeout_millis: Option<u64>,
1652 ) {
1653 }
1654
1655 #[cfg(not(madsim))]
1657 pub fn try_add_panic_event_blocking(
1658 &self,
1659 panic_info: impl Display,
1660 timeout_millis: Option<u64>,
1661 ) {
1662 let event = event_log::EventWorkerNodePanic {
1663 worker_id: self.worker_id,
1664 worker_type: self.worker_type.into(),
1665 host_addr: Some(self.host_addr.to_protobuf()),
1666 panic_info: format!("{panic_info}"),
1667 };
1668 let grpc_meta_client = self.inner.clone();
1669 let _ = thread::spawn(move || {
1670 let rt = tokio::runtime::Builder::new_current_thread()
1671 .enable_all()
1672 .build()
1673 .unwrap();
1674 let req = AddEventLogRequest {
1675 event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1676 };
1677 rt.block_on(async {
1678 let _ = tokio::time::timeout(
1679 Duration::from_millis(timeout_millis.unwrap_or(1000)),
1680 grpc_meta_client.add_event_log(req),
1681 )
1682 .await;
1683 });
1684 })
1685 .join();
1686 }
1687
1688 pub async fn add_sink_fail_evet(
1689 &self,
1690 sink_id: u32,
1691 sink_name: String,
1692 connector: String,
1693 error: String,
1694 ) -> Result<()> {
1695 let event = event_log::EventSinkFail {
1696 sink_id,
1697 sink_name,
1698 connector,
1699 error,
1700 };
1701 let req = AddEventLogRequest {
1702 event: Some(add_event_log_request::Event::SinkFail(event)),
1703 };
1704 self.inner.add_event_log(req).await?;
1705 Ok(())
1706 }
1707
1708 pub async fn add_cdc_auto_schema_change_fail_event(
1709 &self,
1710 table_id: u32,
1711 table_name: String,
1712 cdc_table_id: String,
1713 upstream_ddl: String,
1714 fail_info: String,
1715 ) -> Result<()> {
1716 let event = event_log::EventAutoSchemaChangeFail {
1717 table_id,
1718 table_name,
1719 cdc_table_id,
1720 upstream_ddl,
1721 fail_info,
1722 };
1723 let req = AddEventLogRequest {
1724 event: Some(add_event_log_request::Event::AutoSchemaChangeFail(event)),
1725 };
1726 self.inner.add_event_log(req).await?;
1727 Ok(())
1728 }
1729
1730 pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1731 let req = CancelCompactTaskRequest {
1732 task_id,
1733 task_status: task_status as _,
1734 };
1735 let resp = self.inner.cancel_compact_task(req).await?;
1736 Ok(resp.ret)
1737 }
1738
1739 pub async fn get_version_by_epoch(
1740 &self,
1741 epoch: HummockEpoch,
1742 table_id: u32,
1743 ) -> Result<PbHummockVersion> {
1744 let req = GetVersionByEpochRequest { epoch, table_id };
1745 let resp = self.inner.get_version_by_epoch(req).await?;
1746 Ok(resp.version.unwrap())
1747 }
1748
1749 pub async fn get_cluster_limits(
1750 &self,
1751 ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1752 let req = GetClusterLimitsRequest {};
1753 let resp = self.inner.get_cluster_limits(req).await?;
1754 Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1755 }
1756
1757 pub async fn merge_compaction_group(
1758 &self,
1759 left_group_id: CompactionGroupId,
1760 right_group_id: CompactionGroupId,
1761 ) -> Result<()> {
1762 let req = MergeCompactionGroupRequest {
1763 left_group_id,
1764 right_group_id,
1765 };
1766 self.inner.merge_compaction_group(req).await?;
1767 Ok(())
1768 }
1769
1770 pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1772 let request = ListRateLimitsRequest {};
1773 let resp = self.inner.list_rate_limits(request).await?;
1774 Ok(resp.rate_limits)
1775 }
1776
1777 pub async fn list_cdc_progress(&self) -> Result<HashMap<u32, PbCdcProgress>> {
1778 let request = ListCdcProgressRequest {};
1779 let resp = self.inner.list_cdc_progress(request).await?;
1780 Ok(resp.cdc_progress)
1781 }
1782
1783 pub async fn create_iceberg_table(
1784 &self,
1785 table_job_info: PbTableJobInfo,
1786 sink_job_info: PbSinkJobInfo,
1787 iceberg_source: PbSource,
1788 if_not_exists: bool,
1789 ) -> Result<WaitVersion> {
1790 let request = CreateIcebergTableRequest {
1791 table_info: Some(table_job_info),
1792 sink_info: Some(sink_job_info),
1793 iceberg_source: Some(iceberg_source),
1794 if_not_exists,
1795 };
1796
1797 let resp = self.inner.create_iceberg_table(request).await?;
1798 Ok(resp
1799 .version
1800 .ok_or_else(|| anyhow!("wait version not set"))?)
1801 }
1802
1803 pub async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
1804 let request = ListIcebergTablesRequest {};
1805 let resp = self.inner.list_iceberg_tables(request).await?;
1806 Ok(resp.iceberg_tables)
1807 }
1808
1809 pub async fn get_cluster_stack_trace(
1811 &self,
1812 actor_traces_format: ActorTracesFormat,
1813 ) -> Result<StackTraceResponse> {
1814 let request = StackTraceRequest {
1815 actor_traces_format: actor_traces_format as i32,
1816 };
1817 let resp = self.inner.stack_trace(request).await?;
1818 Ok(resp)
1819 }
1820
1821 pub async fn set_sync_log_store_aligned(&self, job_id: u32, aligned: bool) -> Result<()> {
1822 let request = SetSyncLogStoreAlignedRequest { job_id, aligned };
1823 self.inner.set_sync_log_store_aligned(request).await?;
1824 Ok(())
1825 }
1826
1827 pub async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
1828 self.inner.refresh(request).await
1829 }
1830}
1831
1832#[async_trait]
1833impl HummockMetaClient for MetaClient {
1834 async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
1835 let req = UnpinVersionBeforeRequest {
1836 context_id: self.worker_id(),
1837 unpin_version_before: unpin_version_before.to_u64(),
1838 };
1839 self.inner.unpin_version_before(req).await?;
1840 Ok(())
1841 }
1842
1843 async fn get_current_version(&self) -> Result<HummockVersion> {
1844 let req = GetCurrentVersionRequest::default();
1845 Ok(HummockVersion::from_rpc_protobuf(
1846 &self
1847 .inner
1848 .get_current_version(req)
1849 .await?
1850 .current_version
1851 .unwrap(),
1852 ))
1853 }
1854
1855 async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
1856 let resp = self
1857 .inner
1858 .get_new_object_ids(GetNewObjectIdsRequest { number })
1859 .await?;
1860 Ok(ObjectIdRange::new(resp.start_id, resp.end_id))
1861 }
1862
1863 async fn commit_epoch_with_change_log(
1864 &self,
1865 _epoch: HummockEpoch,
1866 _sync_result: SyncResult,
1867 _change_log_info: Option<HummockMetaClientChangeLogInfo>,
1868 ) -> Result<()> {
1869 panic!("Only meta service can commit_epoch in production.")
1870 }
1871
1872 async fn trigger_manual_compaction(
1873 &self,
1874 compaction_group_id: u64,
1875 table_id: u32,
1876 level: u32,
1877 sst_ids: Vec<u64>,
1878 ) -> Result<()> {
1879 let req = TriggerManualCompactionRequest {
1881 compaction_group_id,
1882 table_id,
1883 level,
1886 sst_ids,
1887 ..Default::default()
1888 };
1889
1890 self.inner.trigger_manual_compaction(req).await?;
1891 Ok(())
1892 }
1893
1894 async fn trigger_full_gc(
1895 &self,
1896 sst_retention_time_sec: u64,
1897 prefix: Option<String>,
1898 ) -> Result<()> {
1899 self.inner
1900 .trigger_full_gc(TriggerFullGcRequest {
1901 sst_retention_time_sec,
1902 prefix,
1903 })
1904 .await?;
1905 Ok(())
1906 }
1907
1908 async fn subscribe_compaction_event(
1909 &self,
1910 ) -> Result<(
1911 UnboundedSender<SubscribeCompactionEventRequest>,
1912 BoxStream<'static, CompactionEventItem>,
1913 )> {
1914 let (request_sender, request_receiver) =
1915 unbounded_channel::<SubscribeCompactionEventRequest>();
1916 request_sender
1917 .send(SubscribeCompactionEventRequest {
1918 event: Some(subscribe_compaction_event_request::Event::Register(
1919 Register {
1920 context_id: self.worker_id(),
1921 },
1922 )),
1923 create_at: SystemTime::now()
1924 .duration_since(SystemTime::UNIX_EPOCH)
1925 .expect("Clock may have gone backwards")
1926 .as_millis() as u64,
1927 })
1928 .context("Failed to subscribe compaction event")?;
1929
1930 let stream = self
1931 .inner
1932 .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
1933 request_receiver,
1934 )))
1935 .await?;
1936
1937 Ok((request_sender, Box::pin(stream)))
1938 }
1939
1940 async fn get_version_by_epoch(
1941 &self,
1942 epoch: HummockEpoch,
1943 table_id: u32,
1944 ) -> Result<PbHummockVersion> {
1945 self.get_version_by_epoch(epoch, table_id).await
1946 }
1947
1948 async fn subscribe_iceberg_compaction_event(
1949 &self,
1950 ) -> Result<(
1951 UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1952 BoxStream<'static, IcebergCompactionEventItem>,
1953 )> {
1954 let (request_sender, request_receiver) =
1955 unbounded_channel::<SubscribeIcebergCompactionEventRequest>();
1956 request_sender
1957 .send(SubscribeIcebergCompactionEventRequest {
1958 event: Some(subscribe_iceberg_compaction_event_request::Event::Register(
1959 IcebergRegister {
1960 context_id: self.worker_id(),
1961 },
1962 )),
1963 create_at: SystemTime::now()
1964 .duration_since(SystemTime::UNIX_EPOCH)
1965 .expect("Clock may have gone backwards")
1966 .as_millis() as u64,
1967 })
1968 .context("Failed to subscribe compaction event")?;
1969
1970 let stream = self
1971 .inner
1972 .subscribe_iceberg_compaction_event(Request::new(UnboundedReceiverStream::new(
1973 request_receiver,
1974 )))
1975 .await?;
1976
1977 Ok((request_sender, Box::pin(stream)))
1978 }
1979}
1980
1981#[async_trait]
1982impl TelemetryInfoFetcher for MetaClient {
1983 async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
1984 let resp = self
1985 .get_telemetry_info()
1986 .await
1987 .map_err(|e| e.to_report_string())?;
1988 let tracking_id = resp.get_tracking_id().ok();
1989 Ok(tracking_id.map(|id| id.to_owned()))
1990 }
1991}
1992
1993pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
1994
1995#[derive(Debug, Clone)]
1996struct GrpcMetaClientCore {
1997 cluster_client: ClusterServiceClient<Channel>,
1998 meta_member_client: MetaMemberServiceClient<Channel>,
1999 heartbeat_client: HeartbeatServiceClient<Channel>,
2000 ddl_client: DdlServiceClient<Channel>,
2001 hummock_client: HummockManagerServiceClient<Channel>,
2002 notification_client: NotificationServiceClient<Channel>,
2003 stream_client: StreamManagerServiceClient<Channel>,
2004 user_client: UserServiceClient<Channel>,
2005 scale_client: ScaleServiceClient<Channel>,
2006 backup_client: BackupServiceClient<Channel>,
2007 telemetry_client: TelemetryInfoServiceClient<Channel>,
2008 system_params_client: SystemParamsServiceClient<Channel>,
2009 session_params_client: SessionParamServiceClient<Channel>,
2010 serving_client: ServingServiceClient<Channel>,
2011 cloud_client: CloudServiceClient<Channel>,
2012 sink_coordinate_client: SinkCoordinationRpcClient,
2013 event_log_client: EventLogServiceClient<Channel>,
2014 cluster_limit_client: ClusterLimitServiceClient<Channel>,
2015 hosted_iceberg_catalog_service_client: HostedIcebergCatalogServiceClient<Channel>,
2016 monitor_client: MonitorServiceClient<Channel>,
2017}
2018
2019impl GrpcMetaClientCore {
2020 pub(crate) fn new(channel: Channel) -> Self {
2021 let cluster_client = ClusterServiceClient::new(channel.clone());
2022 let meta_member_client = MetaMemberClient::new(channel.clone());
2023 let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
2024 let ddl_client =
2025 DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2026 let hummock_client =
2027 HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2028 let notification_client =
2029 NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2030 let stream_client =
2031 StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2032 let user_client = UserServiceClient::new(channel.clone());
2033 let scale_client =
2034 ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2035 let backup_client = BackupServiceClient::new(channel.clone());
2036 let telemetry_client =
2037 TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2038 let system_params_client = SystemParamsServiceClient::new(channel.clone());
2039 let session_params_client = SessionParamServiceClient::new(channel.clone());
2040 let serving_client = ServingServiceClient::new(channel.clone());
2041 let cloud_client = CloudServiceClient::new(channel.clone());
2042 let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone())
2043 .max_decoding_message_size(usize::MAX);
2044 let event_log_client = EventLogServiceClient::new(channel.clone());
2045 let cluster_limit_client = ClusterLimitServiceClient::new(channel.clone());
2046 let hosted_iceberg_catalog_service_client =
2047 HostedIcebergCatalogServiceClient::new(channel.clone());
2048 let monitor_client = MonitorServiceClient::new(channel);
2049
2050 GrpcMetaClientCore {
2051 cluster_client,
2052 meta_member_client,
2053 heartbeat_client,
2054 ddl_client,
2055 hummock_client,
2056 notification_client,
2057 stream_client,
2058 user_client,
2059 scale_client,
2060 backup_client,
2061 telemetry_client,
2062 system_params_client,
2063 session_params_client,
2064 serving_client,
2065 cloud_client,
2066 sink_coordinate_client,
2067 event_log_client,
2068 cluster_limit_client,
2069 hosted_iceberg_catalog_service_client,
2070 monitor_client,
2071 }
2072 }
2073}
2074
2075#[derive(Debug, Clone)]
2079struct GrpcMetaClient {
2080 member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
2081 core: Arc<RwLock<GrpcMetaClientCore>>,
2082}
2083
2084type MetaMemberClient = MetaMemberServiceClient<Channel>;
2085
2086struct MetaMemberGroup {
2087 members: LruCache<http::Uri, Option<MetaMemberClient>>,
2088}
2089
2090struct MetaMemberManagement {
2091 core_ref: Arc<RwLock<GrpcMetaClientCore>>,
2092 members: Either<MetaMemberClient, MetaMemberGroup>,
2093 current_leader: http::Uri,
2094 meta_config: Arc<MetaConfig>,
2095}
2096
2097impl MetaMemberManagement {
2098 const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
2099
2100 fn host_address_to_uri(addr: HostAddress) -> http::Uri {
2101 format!("http://{}:{}", addr.host, addr.port)
2102 .parse()
2103 .unwrap()
2104 }
2105
2106 async fn recreate_core(&self, channel: Channel) {
2107 let mut core = self.core_ref.write().await;
2108 *core = GrpcMetaClientCore::new(channel);
2109 }
2110
2111 async fn refresh_members(&mut self) -> Result<()> {
2112 let leader_addr = match self.members.as_mut() {
2113 Either::Left(client) => {
2114 let resp = client
2115 .to_owned()
2116 .members(MembersRequest {})
2117 .await
2118 .map_err(RpcError::from_meta_status)?;
2119 let resp = resp.into_inner();
2120 resp.members.into_iter().find(|member| member.is_leader)
2121 }
2122 Either::Right(member_group) => {
2123 let mut fetched_members = None;
2124
2125 for (addr, client) in &mut member_group.members {
2126 let members: Result<_> = try {
2127 let mut client = match client {
2128 Some(cached_client) => cached_client.to_owned(),
2129 None => {
2130 let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
2131 let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
2132 .await
2133 .context("failed to create client")?;
2134 let new_client: MetaMemberClient =
2135 MetaMemberServiceClient::new(channel);
2136 *client = Some(new_client.clone());
2137
2138 new_client
2139 }
2140 };
2141
2142 let resp = client
2143 .members(MembersRequest {})
2144 .await
2145 .context("failed to fetch members")?;
2146
2147 resp.into_inner().members
2148 };
2149
2150 let fetched = members.is_ok();
2151 fetched_members = Some(members);
2152 if fetched {
2153 break;
2154 }
2155 }
2156
2157 let members = fetched_members
2158 .context("no member available in the list")?
2159 .context("could not refresh members")?;
2160
2161 let mut leader = None;
2163 for member in members {
2164 if member.is_leader {
2165 leader = Some(member.clone());
2166 }
2167
2168 let addr = Self::host_address_to_uri(member.address.unwrap());
2169 if !member_group.members.contains(&addr) {
2171 tracing::info!("new meta member joined: {}", addr);
2172 member_group.members.put(addr, None);
2173 }
2174 }
2175
2176 leader
2177 }
2178 };
2179
2180 if let Some(leader) = leader_addr {
2181 let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
2182
2183 if discovered_leader != self.current_leader {
2184 tracing::info!("new meta leader {} discovered", discovered_leader);
2185
2186 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
2187 Duration::from_secs(self.meta_config.meta_leader_lease_secs),
2188 false,
2189 );
2190
2191 let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
2192 let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
2193 GrpcMetaClient::connect_to_endpoint(endpoint).await
2194 })
2195 .await?;
2196
2197 self.recreate_core(channel).await;
2198 self.current_leader = discovered_leader;
2199 }
2200 }
2201
2202 Ok(())
2203 }
2204}
2205
2206impl GrpcMetaClient {
2207 const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
2209 const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
2211 const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
2213 const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
2215
2216 fn start_meta_member_monitor(
2217 &self,
2218 init_leader_addr: http::Uri,
2219 members: Either<MetaMemberClient, MetaMemberGroup>,
2220 force_refresh_receiver: Receiver<Sender<Result<()>>>,
2221 meta_config: Arc<MetaConfig>,
2222 ) -> Result<()> {
2223 let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
2224 let current_leader = init_leader_addr;
2225
2226 let enable_period_tick = matches!(members, Either::Right(_));
2227
2228 let member_management = MetaMemberManagement {
2229 core_ref,
2230 members,
2231 current_leader,
2232 meta_config,
2233 };
2234
2235 let mut force_refresh_receiver = force_refresh_receiver;
2236
2237 tokio::spawn(async move {
2238 let mut member_management = member_management;
2239 let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
2240
2241 loop {
2242 let event: Option<Sender<Result<()>>> = if enable_period_tick {
2243 tokio::select! {
2244 _ = ticker.tick() => None,
2245 result_sender = force_refresh_receiver.recv() => {
2246 if result_sender.is_none() {
2247 break;
2248 }
2249
2250 result_sender
2251 },
2252 }
2253 } else {
2254 let result_sender = force_refresh_receiver.recv().await;
2255
2256 if result_sender.is_none() {
2257 break;
2258 }
2259
2260 result_sender
2261 };
2262
2263 let tick_result = member_management.refresh_members().await;
2264 if let Err(e) = tick_result.as_ref() {
2265 tracing::warn!(error = %e.as_report(), "refresh meta member client failed");
2266 }
2267
2268 if let Some(sender) = event {
2269 let _resp = sender.send(tick_result);
2271 }
2272 }
2273 });
2274
2275 Ok(())
2276 }
2277
2278 async fn force_refresh_leader(&self) -> Result<()> {
2279 let (sender, receiver) = oneshot::channel();
2280
2281 self.member_monitor_event_sender
2282 .send(sender)
2283 .await
2284 .map_err(|e| anyhow!(e))?;
2285
2286 receiver.await.map_err(|e| anyhow!(e))?
2287 }
2288
2289 pub async fn new(strategy: &MetaAddressStrategy, config: Arc<MetaConfig>) -> Result<Self> {
2291 let (channel, addr) = match strategy {
2292 MetaAddressStrategy::LoadBalance(addr) => {
2293 Self::try_build_rpc_channel(vec![addr.clone()]).await
2294 }
2295 MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
2296 }?;
2297 let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
2298 let client = GrpcMetaClient {
2299 member_monitor_event_sender: force_refresh_sender,
2300 core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
2301 };
2302
2303 let meta_member_client = client.core.read().await.meta_member_client.clone();
2304 let members = match strategy {
2305 MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
2306 MetaAddressStrategy::List(addrs) => {
2307 let mut members = LruCache::new(20.try_into().unwrap());
2308 for addr in addrs {
2309 members.put(addr.clone(), None);
2310 }
2311 members.put(addr.clone(), Some(meta_member_client));
2312
2313 Either::Right(MetaMemberGroup { members })
2314 }
2315 };
2316
2317 client.start_meta_member_monitor(addr, members, force_refresh_receiver, config.clone())?;
2318
2319 client.force_refresh_leader().await?;
2320
2321 Ok(client)
2322 }
2323
2324 fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2325 Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2326 }
2327
2328 pub(crate) async fn try_build_rpc_channel(
2329 addrs: impl IntoIterator<Item = http::Uri>,
2330 ) -> Result<(Channel, http::Uri)> {
2331 let endpoints: Vec<_> = addrs
2332 .into_iter()
2333 .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2334 .collect();
2335
2336 let mut last_error = None;
2337
2338 for (endpoint, addr) in endpoints {
2339 match Self::connect_to_endpoint(endpoint).await {
2340 Ok(channel) => {
2341 tracing::info!("Connect to meta server {} successfully", addr);
2342 return Ok((channel, addr));
2343 }
2344 Err(e) => {
2345 tracing::warn!(
2346 error = %e.as_report(),
2347 "Failed to connect to meta server {}, trying again",
2348 addr,
2349 );
2350 last_error = Some(e);
2351 }
2352 }
2353 }
2354
2355 if let Some(last_error) = last_error {
2356 Err(anyhow::anyhow!(last_error)
2357 .context("failed to connect to all meta servers")
2358 .into())
2359 } else {
2360 bail!("no meta server address provided")
2361 }
2362 }
2363
2364 async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2365 let channel = endpoint
2366 .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2367 .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2368 .connect_timeout(Duration::from_secs(5))
2369 .monitored_connect("grpc-meta-client", Default::default())
2370 .await?
2371 .wrapped();
2372
2373 Ok(channel)
2374 }
2375
2376 pub(crate) fn retry_strategy_to_bound(
2377 high_bound: Duration,
2378 exceed: bool,
2379 ) -> impl Iterator<Item = Duration> {
2380 let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2381 .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2382 .map(jitter);
2383
2384 let mut sum = Duration::default();
2385
2386 iter.take_while(move |duration| {
2387 sum += *duration;
2388
2389 if exceed {
2390 sum < high_bound + *duration
2391 } else {
2392 sum < high_bound
2393 }
2394 })
2395 }
2396}
2397
2398macro_rules! for_all_meta_rpc {
2399 ($macro:ident) => {
2400 $macro! {
2401 { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2402 ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2403 ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2404 ,{ cluster_client, update_worker_node_schedulability, UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse }
2405 ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2406 ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2407 ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2408 ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2409 ,{ stream_client, flush, FlushRequest, FlushResponse }
2410 ,{ stream_client, pause, PauseRequest, PauseResponse }
2411 ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2412 ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2413 ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2414 ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2415 ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2416 ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2417 ,{ stream_client, list_creating_fragment_distribution, ListCreatingFragmentDistributionRequest, ListCreatingFragmentDistributionResponse }
2418 ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2419 ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2420 ,{ stream_client, list_object_dependencies, ListObjectDependenciesRequest, ListObjectDependenciesResponse }
2421 ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2422 ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2423 ,{ stream_client, list_cdc_progress, ListCdcProgressRequest, ListCdcProgressResponse }
2424 ,{ stream_client, alter_connector_props, AlterConnectorPropsRequest, AlterConnectorPropsResponse }
2425 ,{ stream_client, get_fragment_by_id, GetFragmentByIdRequest, GetFragmentByIdResponse }
2426 ,{ stream_client, set_sync_log_store_aligned, SetSyncLogStoreAlignedRequest, SetSyncLogStoreAlignedResponse }
2427 ,{ stream_client, refresh, RefreshRequest, RefreshResponse }
2428 ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2429 ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2430 ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2431 ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2432 ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2433 ,{ ddl_client, alter_cdc_table_backfill_parallelism, AlterCdcTableBackfillParallelismRequest, AlterCdcTableBackfillParallelismResponse }
2434 ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2435 ,{ ddl_client, alter_database_param, AlterDatabaseParamRequest, AlterDatabaseParamResponse }
2436 ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2437 ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2438 ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2439 ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2440 ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2441 ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2442 ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2443 ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2444 ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2445 ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2446 ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2447 ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2448 ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2449 ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2450 ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2451 ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2452 ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2453 ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2454 ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2455 ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2456 ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2457 ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2458 ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2459 ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2460 ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2461 ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2462 ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2463 ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2464 ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2465 ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2466 ,{ ddl_client, wait, WaitRequest, WaitResponse }
2467 ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2468 ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2469 ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2470 ,{ ddl_client, compact_iceberg_table, CompactIcebergTableRequest, CompactIcebergTableResponse }
2471 ,{ ddl_client, expire_iceberg_table_snapshots, ExpireIcebergTableSnapshotsRequest, ExpireIcebergTableSnapshotsResponse }
2472 ,{ ddl_client, create_iceberg_table, CreateIcebergTableRequest, CreateIcebergTableResponse }
2473 ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2474 ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2475 ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2476 ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2477 ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2478 ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2479 ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2480 ,{ hummock_client, get_new_object_ids, GetNewObjectIdsRequest, GetNewObjectIdsResponse }
2481 ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2482 ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2483 ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2484 ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2485 ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2486 ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2487 ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2488 ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2489 ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2490 ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2491 ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2492 ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2493 ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2494 ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2495 ,{ hummock_client, subscribe_iceberg_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeIcebergCompactionEventRequest>, Streaming<SubscribeIcebergCompactionEventResponse> }
2496 ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2497 ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2498 ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2499 ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2500 ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2501 ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2502 ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2503 ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2504 ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2505 ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2506 ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2507 ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2508 ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2509 ,{ user_client, alter_default_privilege, AlterDefaultPrivilegeRequest, AlterDefaultPrivilegeResponse }
2510 ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2511 ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2512 ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2513 ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2514 ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2515 ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2516 ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2517 ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2518 ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2519 ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2520 ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2521 ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2522 ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2523 ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2524 ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2525 ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2526 ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2527 ,{ hosted_iceberg_catalog_service_client, list_iceberg_tables, ListIcebergTablesRequest, ListIcebergTablesResponse }
2528 ,{ monitor_client, stack_trace, StackTraceRequest, StackTraceResponse }
2529 }
2530 };
2531}
2532
2533impl GrpcMetaClient {
2534 async fn refresh_client_if_needed(&self, code: Code) {
2535 if matches!(
2536 code,
2537 Code::Unknown | Code::Unimplemented | Code::Unavailable
2538 ) {
2539 tracing::debug!("matching tonic code {}", code);
2540 let (result_sender, result_receiver) = oneshot::channel();
2541 if self
2542 .member_monitor_event_sender
2543 .try_send(result_sender)
2544 .is_ok()
2545 {
2546 if let Ok(Err(e)) = result_receiver.await {
2547 tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2548 }
2549 } else {
2550 tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2551 }
2552 }
2553 }
2554}
2555
2556impl GrpcMetaClient {
2557 for_all_meta_rpc! { meta_rpc_client_method_impl }
2558}