1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{
33 AlterDatabaseParam, FunctionId, IndexId, ObjectId, SecretId, TableId,
34};
35use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
36use risingwave_common::hash::WorkerSlotMapping;
37use risingwave_common::id::{
38 ConnectionId, DatabaseId, JobId, SchemaId, SinkId, SubscriptionId, ViewId, WorkerId,
39};
40use risingwave_common::monitor::EndpointExt;
41use risingwave_common::system_param::reader::SystemParamsReader;
42use risingwave_common::telemetry::report::TelemetryInfoFetcher;
43use risingwave_common::util::addr::HostAddr;
44use risingwave_common::util::meta_addr::MetaAddressStrategy;
45use risingwave_common::util::resource_util::cpu::total_cpu_available;
46use risingwave_common::util::resource_util::hostname;
47use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
48use risingwave_error::bail;
49use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
50use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
51use risingwave_hummock_sdk::{
52 CompactionGroupId, HummockEpoch, HummockVersionId, ObjectIdRange, SyncResult,
53};
54use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
55use risingwave_pb::backup_service::*;
56use risingwave_pb::catalog::{
57 Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
58 PbSubscription, PbTable, PbView, Table,
59};
60use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
61use risingwave_pb::cloud_service::*;
62use risingwave_pb::common::worker_node::Property;
63use risingwave_pb::common::{HostAddress, OptionalUint32, OptionalUint64, WorkerNode, WorkerType};
64use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
65use risingwave_pb::ddl_service::alter_owner_request::Object;
66use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
67use risingwave_pb::ddl_service::create_materialized_view_request::PbBackfillType;
68use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
69use risingwave_pb::ddl_service::*;
70use risingwave_pb::hummock::compact_task::TaskStatus;
71use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
72use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
73use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
74use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
75use risingwave_pb::hummock::write_limits::WriteLimit;
76use risingwave_pb::hummock::*;
77use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Register as IcebergRegister;
78use risingwave_pb::iceberg_compaction::{
79 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
80 subscribe_iceberg_compaction_event_request,
81};
82use risingwave_pb::id::{ActorId, FragmentId, SourceId};
83use risingwave_pb::meta::alter_connector_props_request::{
84 AlterConnectorPropsObject, AlterIcebergTableIds, ExtraOptions,
85};
86use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
87use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
88use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
89use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
90use risingwave_pb::meta::hosted_iceberg_catalog_service_client::HostedIcebergCatalogServiceClient;
91use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
92use risingwave_pb::meta::list_actor_states_response::ActorState;
93use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
94use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
95use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
96use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
97use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
98use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
99use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
100use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
101use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
102use risingwave_pb::meta::serving_service_client::ServingServiceClient;
103use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
104use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
105use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
106use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
107use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
108use risingwave_pb::meta::{FragmentDistribution, *};
109use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
110use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
111use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
112use risingwave_pb::secret::PbSecretRef;
113use risingwave_pb::stream_plan::StreamFragmentGraph;
114use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
115use risingwave_pb::user::update_user_request::UpdateField;
116use risingwave_pb::user::user_service_client::UserServiceClient;
117use risingwave_pb::user::*;
118use thiserror_ext::AsReport;
119use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
120use tokio::sync::oneshot::Sender;
121use tokio::sync::{RwLock, mpsc, oneshot};
122use tokio::task::JoinHandle;
123use tokio::time::{self};
124use tokio_retry::strategy::{ExponentialBackoff, jitter};
125use tokio_stream::wrappers::UnboundedReceiverStream;
126use tonic::transport::Endpoint;
127use tonic::{Code, Request, Streaming};
128
129use crate::channel::{Channel, WrappedChannelExt};
130use crate::error::{Result, RpcError};
131use crate::hummock_meta_client::{
132 CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
133 IcebergCompactionEventItem,
134};
135use crate::meta_rpc_client_method_impl;
136
137#[derive(Clone, Debug)]
139pub struct MetaClient {
140 worker_id: WorkerId,
141 worker_type: WorkerType,
142 host_addr: HostAddr,
143 inner: GrpcMetaClient,
144 meta_config: Arc<MetaConfig>,
145 cluster_id: String,
146 shutting_down: Arc<AtomicBool>,
147}
148
149impl MetaClient {
150 pub fn worker_id(&self) -> WorkerId {
151 self.worker_id
152 }
153
154 pub fn host_addr(&self) -> &HostAddr {
155 &self.host_addr
156 }
157
158 pub fn worker_type(&self) -> WorkerType {
159 self.worker_type
160 }
161
162 pub fn cluster_id(&self) -> &str {
163 &self.cluster_id
164 }
165
166 pub async fn subscribe(
168 &self,
169 subscribe_type: SubscribeType,
170 ) -> Result<Streaming<SubscribeResponse>> {
171 let request = SubscribeRequest {
172 subscribe_type: subscribe_type as i32,
173 host: Some(self.host_addr.to_protobuf()),
174 worker_id: self.worker_id(),
175 };
176
177 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
178 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
179 true,
180 );
181
182 tokio_retry::Retry::spawn(retry_strategy, || async {
183 let request = request.clone();
184 self.inner.subscribe(request).await
185 })
186 .await
187 }
188
189 pub async fn create_connection(
190 &self,
191 connection_name: String,
192 database_id: DatabaseId,
193 schema_id: SchemaId,
194 owner_id: u32,
195 req: create_connection_request::Payload,
196 ) -> Result<WaitVersion> {
197 let request = CreateConnectionRequest {
198 name: connection_name,
199 database_id,
200 schema_id,
201 owner_id,
202 payload: Some(req),
203 };
204 let resp = self.inner.create_connection(request).await?;
205 Ok(resp
206 .version
207 .ok_or_else(|| anyhow!("wait version not set"))?)
208 }
209
210 pub async fn create_secret(
211 &self,
212 secret_name: String,
213 database_id: DatabaseId,
214 schema_id: SchemaId,
215 owner_id: u32,
216 value: Vec<u8>,
217 ) -> Result<WaitVersion> {
218 let request = CreateSecretRequest {
219 name: secret_name,
220 database_id,
221 schema_id,
222 owner_id,
223 value,
224 };
225 let resp = self.inner.create_secret(request).await?;
226 Ok(resp
227 .version
228 .ok_or_else(|| anyhow!("wait version not set"))?)
229 }
230
231 pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
232 let request = ListConnectionsRequest {};
233 let resp = self.inner.list_connections(request).await?;
234 Ok(resp.connections)
235 }
236
237 pub async fn drop_connection(
238 &self,
239 connection_id: ConnectionId,
240 cascade: bool,
241 ) -> Result<WaitVersion> {
242 let request = DropConnectionRequest {
243 connection_id,
244 cascade,
245 };
246 let resp = self.inner.drop_connection(request).await?;
247 Ok(resp
248 .version
249 .ok_or_else(|| anyhow!("wait version not set"))?)
250 }
251
252 pub async fn drop_secret(&self, secret_id: SecretId) -> Result<WaitVersion> {
253 let request = DropSecretRequest { secret_id };
254 let resp = self.inner.drop_secret(request).await?;
255 Ok(resp
256 .version
257 .ok_or_else(|| anyhow!("wait version not set"))?)
258 }
259
260 pub async fn register_new(
264 addr_strategy: MetaAddressStrategy,
265 worker_type: WorkerType,
266 addr: &HostAddr,
267 property: Property,
268 meta_config: Arc<MetaConfig>,
269 ) -> (Self, SystemParamsReader) {
270 let ret =
271 Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
272
273 match ret {
274 Ok(ret) => ret,
275 Err(err) => {
276 tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
277 std::process::exit(1);
278 }
279 }
280 }
281
282 async fn register_new_inner(
283 addr_strategy: MetaAddressStrategy,
284 worker_type: WorkerType,
285 addr: &HostAddr,
286 property: Property,
287 meta_config: Arc<MetaConfig>,
288 ) -> Result<(Self, SystemParamsReader)> {
289 tracing::info!("register meta client using strategy: {}", addr_strategy);
290
291 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
293 Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
294 true,
295 );
296
297 if property.is_unschedulable {
298 tracing::warn!("worker {:?} registered as unschedulable", addr.clone());
299 }
300 let init_result: Result<_> = tokio_retry::RetryIf::spawn(
301 retry_strategy,
302 || async {
303 let grpc_meta_client =
304 GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
305
306 let add_worker_resp = grpc_meta_client
307 .add_worker_node(AddWorkerNodeRequest {
308 worker_type: worker_type as i32,
309 host: Some(addr.to_protobuf()),
310 property: Some(property.clone()),
311 resource: Some(risingwave_pb::common::worker_node::Resource {
312 rw_version: RW_VERSION.to_owned(),
313 total_memory_bytes: system_memory_available_bytes() as _,
314 total_cpu_cores: total_cpu_available() as _,
315 hostname: hostname(),
316 }),
317 })
318 .await
319 .context("failed to add worker node")?;
320
321 let system_params_resp = grpc_meta_client
322 .get_system_params(GetSystemParamsRequest {})
323 .await
324 .context("failed to get initial system params")?;
325
326 Ok((add_worker_resp, system_params_resp, grpc_meta_client))
327 },
328 |e: &RpcError| !e.is_from_tonic_server_impl(),
331 )
332 .await;
333
334 let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
335 let worker_id = add_worker_resp
336 .node_id
337 .expect("AddWorkerNodeResponse::node_id is empty");
338
339 let meta_client = Self {
340 worker_id,
341 worker_type,
342 host_addr: addr.clone(),
343 inner: grpc_meta_client,
344 meta_config: meta_config.clone(),
345 cluster_id: add_worker_resp.cluster_id,
346 shutting_down: Arc::new(false.into()),
347 };
348
349 static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
350 REPORT_PANIC.call_once(|| {
351 let meta_client_clone = meta_client.clone();
352 std::panic::update_hook(move |default_hook, info| {
353 meta_client_clone.try_add_panic_event_blocking(info, None);
355 default_hook(info);
356 });
357 });
358
359 Ok((meta_client, system_params_resp.params.unwrap().into()))
360 }
361
362 pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
364 let request = ActivateWorkerNodeRequest {
365 host: Some(addr.to_protobuf()),
366 node_id: self.worker_id,
367 };
368 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
369 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
370 true,
371 );
372 tokio_retry::Retry::spawn(retry_strategy, || async {
373 let request = request.clone();
374 self.inner.activate_worker_node(request).await
375 })
376 .await?;
377
378 Ok(())
379 }
380
381 pub async fn send_heartbeat(&self, node_id: WorkerId) -> Result<()> {
383 let request = HeartbeatRequest { node_id };
384 let resp = self.inner.heartbeat(request).await?;
385 if let Some(status) = resp.status
386 && status.code() == risingwave_pb::common::status::Code::UnknownWorker
387 {
388 if !self.shutting_down.load(Relaxed) {
391 tracing::error!(message = status.message, "worker expired");
392 std::process::exit(1);
393 }
394 }
395 Ok(())
396 }
397
398 pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
399 let request = CreateDatabaseRequest { db: Some(db) };
400 let resp = self.inner.create_database(request).await?;
401 Ok(resp
403 .version
404 .ok_or_else(|| anyhow!("wait version not set"))?)
405 }
406
407 pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
408 let request = CreateSchemaRequest {
409 schema: Some(schema),
410 };
411 let resp = self.inner.create_schema(request).await?;
412 Ok(resp
414 .version
415 .ok_or_else(|| anyhow!("wait version not set"))?)
416 }
417
418 pub async fn create_materialized_view(
419 &self,
420 table: PbTable,
421 graph: StreamFragmentGraph,
422 dependencies: HashSet<ObjectId>,
423 specific_resource_group: Option<String>,
424 if_not_exists: bool,
425 ) -> Result<WaitVersion> {
426 let request = CreateMaterializedViewRequest {
427 materialized_view: Some(table),
428 fragment_graph: Some(graph),
429 backfill: PbBackfillType::Regular as _,
430 dependencies: dependencies.into_iter().collect(),
431 specific_resource_group,
432 if_not_exists,
433 };
434 let resp = self.inner.create_materialized_view(request).await?;
435 Ok(resp
437 .version
438 .ok_or_else(|| anyhow!("wait version not set"))?)
439 }
440
441 pub async fn drop_materialized_view(
442 &self,
443 table_id: TableId,
444 cascade: bool,
445 ) -> Result<WaitVersion> {
446 let request = DropMaterializedViewRequest { table_id, cascade };
447
448 let resp = self.inner.drop_materialized_view(request).await?;
449 Ok(resp
450 .version
451 .ok_or_else(|| anyhow!("wait version not set"))?)
452 }
453
454 pub async fn create_source(
455 &self,
456 source: PbSource,
457 graph: Option<StreamFragmentGraph>,
458 if_not_exists: bool,
459 ) -> Result<WaitVersion> {
460 let request = CreateSourceRequest {
461 source: Some(source),
462 fragment_graph: graph,
463 if_not_exists,
464 };
465
466 let resp = self.inner.create_source(request).await?;
467 Ok(resp
468 .version
469 .ok_or_else(|| anyhow!("wait version not set"))?)
470 }
471
472 pub async fn create_sink(
473 &self,
474 sink: PbSink,
475 graph: StreamFragmentGraph,
476 dependencies: HashSet<ObjectId>,
477 if_not_exists: bool,
478 ) -> Result<WaitVersion> {
479 let request = CreateSinkRequest {
480 sink: Some(sink),
481 fragment_graph: Some(graph),
482 dependencies: dependencies.into_iter().collect(),
483 if_not_exists,
484 };
485
486 let resp = self.inner.create_sink(request).await?;
487 Ok(resp
488 .version
489 .ok_or_else(|| anyhow!("wait version not set"))?)
490 }
491
492 pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
493 let request = CreateSubscriptionRequest {
494 subscription: Some(subscription),
495 };
496
497 let resp = self.inner.create_subscription(request).await?;
498 Ok(resp
499 .version
500 .ok_or_else(|| anyhow!("wait version not set"))?)
501 }
502
503 pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
504 let request = CreateFunctionRequest {
505 function: Some(function),
506 };
507 let resp = self.inner.create_function(request).await?;
508 Ok(resp
509 .version
510 .ok_or_else(|| anyhow!("wait version not set"))?)
511 }
512
513 pub async fn create_table(
514 &self,
515 source: Option<PbSource>,
516 table: PbTable,
517 graph: StreamFragmentGraph,
518 job_type: PbTableJobType,
519 if_not_exists: bool,
520 dependencies: HashSet<ObjectId>,
521 ) -> Result<WaitVersion> {
522 let request = CreateTableRequest {
523 materialized_view: Some(table),
524 fragment_graph: Some(graph),
525 source,
526 job_type: job_type as _,
527 if_not_exists,
528 dependencies: dependencies.into_iter().collect(),
529 };
530 let resp = self.inner.create_table(request).await?;
531 Ok(resp
533 .version
534 .ok_or_else(|| anyhow!("wait version not set"))?)
535 }
536
537 pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
538 let request = CommentOnRequest {
539 comment: Some(comment),
540 };
541 let resp = self.inner.comment_on(request).await?;
542 Ok(resp
543 .version
544 .ok_or_else(|| anyhow!("wait version not set"))?)
545 }
546
547 pub async fn alter_name(
548 &self,
549 object: alter_name_request::Object,
550 name: &str,
551 ) -> Result<WaitVersion> {
552 let request = AlterNameRequest {
553 object: Some(object),
554 new_name: name.to_owned(),
555 };
556 let resp = self.inner.alter_name(request).await?;
557 Ok(resp
558 .version
559 .ok_or_else(|| anyhow!("wait version not set"))?)
560 }
561
562 pub async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<WaitVersion> {
563 let request = AlterOwnerRequest {
564 object: Some(object),
565 owner_id,
566 };
567 let resp = self.inner.alter_owner(request).await?;
568 Ok(resp
569 .version
570 .ok_or_else(|| anyhow!("wait version not set"))?)
571 }
572
573 pub async fn alter_database_param(
574 &self,
575 database_id: DatabaseId,
576 param: AlterDatabaseParam,
577 ) -> Result<WaitVersion> {
578 let request = match param {
579 AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms) => {
580 let barrier_interval_ms = OptionalUint32 {
581 value: barrier_interval_ms,
582 };
583 AlterDatabaseParamRequest {
584 database_id,
585 param: Some(alter_database_param_request::Param::BarrierIntervalMs(
586 barrier_interval_ms,
587 )),
588 }
589 }
590 AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency) => {
591 let checkpoint_frequency = OptionalUint64 {
592 value: checkpoint_frequency,
593 };
594 AlterDatabaseParamRequest {
595 database_id,
596 param: Some(alter_database_param_request::Param::CheckpointFrequency(
597 checkpoint_frequency,
598 )),
599 }
600 }
601 };
602 let resp = self.inner.alter_database_param(request).await?;
603 Ok(resp
604 .version
605 .ok_or_else(|| anyhow!("wait version not set"))?)
606 }
607
608 pub async fn alter_set_schema(
609 &self,
610 object: alter_set_schema_request::Object,
611 new_schema_id: SchemaId,
612 ) -> Result<WaitVersion> {
613 let request = AlterSetSchemaRequest {
614 new_schema_id,
615 object: Some(object),
616 };
617 let resp = self.inner.alter_set_schema(request).await?;
618 Ok(resp
619 .version
620 .ok_or_else(|| anyhow!("wait version not set"))?)
621 }
622
623 pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
624 let request = AlterSourceRequest {
625 source: Some(source),
626 };
627 let resp = self.inner.alter_source(request).await?;
628 Ok(resp
629 .version
630 .ok_or_else(|| anyhow!("wait version not set"))?)
631 }
632
633 pub async fn alter_parallelism(
634 &self,
635 job_id: JobId,
636 parallelism: PbTableParallelism,
637 deferred: bool,
638 ) -> Result<()> {
639 let request = AlterParallelismRequest {
640 table_id: job_id,
641 parallelism: Some(parallelism),
642 deferred,
643 };
644
645 self.inner.alter_parallelism(request).await?;
646 Ok(())
647 }
648
649 pub async fn alter_streaming_job_config(
650 &self,
651 job_id: JobId,
652 entries_to_add: HashMap<String, String>,
653 keys_to_remove: Vec<String>,
654 ) -> Result<()> {
655 let request = AlterStreamingJobConfigRequest {
656 job_id,
657 entries_to_add,
658 keys_to_remove,
659 };
660
661 self.inner.alter_streaming_job_config(request).await?;
662 Ok(())
663 }
664
665 pub async fn alter_fragment_parallelism(
666 &self,
667 fragment_ids: Vec<FragmentId>,
668 parallelism: Option<PbTableParallelism>,
669 ) -> Result<()> {
670 let request = AlterFragmentParallelismRequest {
671 fragment_ids,
672 parallelism,
673 };
674
675 self.inner.alter_fragment_parallelism(request).await?;
676 Ok(())
677 }
678
679 pub async fn alter_cdc_table_backfill_parallelism(
680 &self,
681 table_id: JobId,
682 parallelism: PbTableParallelism,
683 ) -> Result<()> {
684 let request = AlterCdcTableBackfillParallelismRequest {
685 table_id,
686 parallelism: Some(parallelism),
687 };
688 self.inner
689 .alter_cdc_table_backfill_parallelism(request)
690 .await?;
691 Ok(())
692 }
693
694 pub async fn alter_resource_group(
695 &self,
696 table_id: TableId,
697 resource_group: Option<String>,
698 deferred: bool,
699 ) -> Result<()> {
700 let request = AlterResourceGroupRequest {
701 table_id,
702 resource_group,
703 deferred,
704 };
705
706 self.inner.alter_resource_group(request).await?;
707 Ok(())
708 }
709
710 pub async fn alter_swap_rename(
711 &self,
712 object: alter_swap_rename_request::Object,
713 ) -> Result<WaitVersion> {
714 let request = AlterSwapRenameRequest {
715 object: Some(object),
716 };
717 let resp = self.inner.alter_swap_rename(request).await?;
718 Ok(resp
719 .version
720 .ok_or_else(|| anyhow!("wait version not set"))?)
721 }
722
723 pub async fn alter_secret(
724 &self,
725 secret_id: SecretId,
726 secret_name: String,
727 database_id: DatabaseId,
728 schema_id: SchemaId,
729 owner_id: u32,
730 value: Vec<u8>,
731 ) -> Result<WaitVersion> {
732 let request = AlterSecretRequest {
733 secret_id,
734 name: secret_name,
735 database_id,
736 schema_id,
737 owner_id,
738 value,
739 };
740 let resp = self.inner.alter_secret(request).await?;
741 Ok(resp
742 .version
743 .ok_or_else(|| anyhow!("wait version not set"))?)
744 }
745
746 pub async fn replace_job(
747 &self,
748 graph: StreamFragmentGraph,
749 replace_job: ReplaceJob,
750 ) -> Result<WaitVersion> {
751 let request = ReplaceJobPlanRequest {
752 plan: Some(ReplaceJobPlan {
753 fragment_graph: Some(graph),
754 replace_job: Some(replace_job),
755 }),
756 };
757 let resp = self.inner.replace_job_plan(request).await?;
758 Ok(resp
760 .version
761 .ok_or_else(|| anyhow!("wait version not set"))?)
762 }
763
764 pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
765 let request = AutoSchemaChangeRequest {
766 schema_change: Some(schema_change),
767 };
768 let _ = self.inner.auto_schema_change(request).await?;
769 Ok(())
770 }
771
772 pub async fn create_view(
773 &self,
774 view: PbView,
775 dependencies: HashSet<ObjectId>,
776 ) -> Result<WaitVersion> {
777 let request = CreateViewRequest {
778 view: Some(view),
779 dependencies: dependencies.into_iter().collect(),
780 };
781 let resp = self.inner.create_view(request).await?;
782 Ok(resp
784 .version
785 .ok_or_else(|| anyhow!("wait version not set"))?)
786 }
787
788 pub async fn create_index(
789 &self,
790 index: PbIndex,
791 table: PbTable,
792 graph: StreamFragmentGraph,
793 if_not_exists: bool,
794 ) -> Result<WaitVersion> {
795 let request = CreateIndexRequest {
796 index: Some(index),
797 index_table: Some(table),
798 fragment_graph: Some(graph),
799 if_not_exists,
800 };
801 let resp = self.inner.create_index(request).await?;
802 Ok(resp
804 .version
805 .ok_or_else(|| anyhow!("wait version not set"))?)
806 }
807
808 pub async fn drop_table(
809 &self,
810 source_id: Option<u32>,
811 table_id: TableId,
812 cascade: bool,
813 ) -> Result<WaitVersion> {
814 let request = DropTableRequest {
815 source_id: source_id.map(risingwave_pb::ddl_service::drop_table_request::SourceId::Id),
816 table_id,
817 cascade,
818 };
819
820 let resp = self.inner.drop_table(request).await?;
821 Ok(resp
822 .version
823 .ok_or_else(|| anyhow!("wait version not set"))?)
824 }
825
826 pub async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
827 let request = CompactIcebergTableRequest { sink_id };
828 let resp = self.inner.compact_iceberg_table(request).await?;
829 Ok(resp.task_id)
830 }
831
832 pub async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
833 let request = ExpireIcebergTableSnapshotsRequest { sink_id };
834 let _resp = self.inner.expire_iceberg_table_snapshots(request).await?;
835 Ok(())
836 }
837
838 pub async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<WaitVersion> {
839 let request = DropViewRequest { view_id, cascade };
840 let resp = self.inner.drop_view(request).await?;
841 Ok(resp
842 .version
843 .ok_or_else(|| anyhow!("wait version not set"))?)
844 }
845
846 pub async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<WaitVersion> {
847 let request = DropSourceRequest { source_id, cascade };
848 let resp = self.inner.drop_source(request).await?;
849 Ok(resp
850 .version
851 .ok_or_else(|| anyhow!("wait version not set"))?)
852 }
853
854 pub async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<WaitVersion> {
855 let request = DropSinkRequest { sink_id, cascade };
856 let resp = self.inner.drop_sink(request).await?;
857 Ok(resp
858 .version
859 .ok_or_else(|| anyhow!("wait version not set"))?)
860 }
861
862 pub async fn drop_subscription(
863 &self,
864 subscription_id: SubscriptionId,
865 cascade: bool,
866 ) -> Result<WaitVersion> {
867 let request = DropSubscriptionRequest {
868 subscription_id,
869 cascade,
870 };
871 let resp = self.inner.drop_subscription(request).await?;
872 Ok(resp
873 .version
874 .ok_or_else(|| anyhow!("wait version not set"))?)
875 }
876
877 pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
878 let request = DropIndexRequest { index_id, cascade };
879 let resp = self.inner.drop_index(request).await?;
880 Ok(resp
881 .version
882 .ok_or_else(|| anyhow!("wait version not set"))?)
883 }
884
885 pub async fn drop_function(
886 &self,
887 function_id: FunctionId,
888 cascade: bool,
889 ) -> Result<WaitVersion> {
890 let request = DropFunctionRequest {
891 function_id,
892 cascade,
893 };
894 let resp = self.inner.drop_function(request).await?;
895 Ok(resp
896 .version
897 .ok_or_else(|| anyhow!("wait version not set"))?)
898 }
899
900 pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
901 let request = DropDatabaseRequest { database_id };
902 let resp = self.inner.drop_database(request).await?;
903 Ok(resp
904 .version
905 .ok_or_else(|| anyhow!("wait version not set"))?)
906 }
907
908 pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
909 let request = DropSchemaRequest { schema_id, cascade };
910 let resp = self.inner.drop_schema(request).await?;
911 Ok(resp
912 .version
913 .ok_or_else(|| anyhow!("wait version not set"))?)
914 }
915
916 pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
918 let request = CreateUserRequest { user: Some(user) };
919 let resp = self.inner.create_user(request).await?;
920 Ok(resp.version)
921 }
922
923 pub async fn drop_user(&self, user_id: u32) -> Result<u64> {
924 let request = DropUserRequest { user_id };
925 let resp = self.inner.drop_user(request).await?;
926 Ok(resp.version)
927 }
928
929 pub async fn update_user(
930 &self,
931 user: UserInfo,
932 update_fields: Vec<UpdateField>,
933 ) -> Result<u64> {
934 let request = UpdateUserRequest {
935 user: Some(user),
936 update_fields: update_fields
937 .into_iter()
938 .map(|field| field as i32)
939 .collect::<Vec<_>>(),
940 };
941 let resp = self.inner.update_user(request).await?;
942 Ok(resp.version)
943 }
944
945 pub async fn grant_privilege(
946 &self,
947 user_ids: Vec<u32>,
948 privileges: Vec<GrantPrivilege>,
949 with_grant_option: bool,
950 granted_by: u32,
951 ) -> Result<u64> {
952 let request = GrantPrivilegeRequest {
953 user_ids,
954 privileges,
955 with_grant_option,
956 granted_by,
957 };
958 let resp = self.inner.grant_privilege(request).await?;
959 Ok(resp.version)
960 }
961
962 pub async fn revoke_privilege(
963 &self,
964 user_ids: Vec<u32>,
965 privileges: Vec<GrantPrivilege>,
966 granted_by: u32,
967 revoke_by: u32,
968 revoke_grant_option: bool,
969 cascade: bool,
970 ) -> Result<u64> {
971 let request = RevokePrivilegeRequest {
972 user_ids,
973 privileges,
974 granted_by,
975 revoke_by,
976 revoke_grant_option,
977 cascade,
978 };
979 let resp = self.inner.revoke_privilege(request).await?;
980 Ok(resp.version)
981 }
982
983 pub async fn alter_default_privilege(
984 &self,
985 user_ids: Vec<u32>,
986 database_id: DatabaseId,
987 schema_ids: Vec<SchemaId>,
988 operation: AlterDefaultPrivilegeOperation,
989 granted_by: u32,
990 ) -> Result<()> {
991 let request = AlterDefaultPrivilegeRequest {
992 user_ids,
993 database_id,
994 schema_ids,
995 operation: Some(operation),
996 granted_by,
997 };
998 self.inner.alter_default_privilege(request).await?;
999 Ok(())
1000 }
1001
1002 pub async fn unregister(&self) -> Result<()> {
1004 let request = DeleteWorkerNodeRequest {
1005 host: Some(self.host_addr.to_protobuf()),
1006 };
1007 self.inner.delete_worker_node(request).await?;
1008 self.shutting_down.store(true, Relaxed);
1009 Ok(())
1010 }
1011
1012 pub async fn try_unregister(&self) {
1014 match self.unregister().await {
1015 Ok(_) => {
1016 tracing::info!(
1017 worker_id = %self.worker_id(),
1018 "successfully unregistered from meta service",
1019 )
1020 }
1021 Err(e) => {
1022 tracing::warn!(
1023 error = %e.as_report(),
1024 worker_id = %self.worker_id(),
1025 "failed to unregister from meta service",
1026 );
1027 }
1028 }
1029 }
1030
1031 pub async fn update_schedulability(
1032 &self,
1033 worker_ids: &[WorkerId],
1034 schedulability: Schedulability,
1035 ) -> Result<UpdateWorkerNodeSchedulabilityResponse> {
1036 let request = UpdateWorkerNodeSchedulabilityRequest {
1037 worker_ids: worker_ids.to_vec(),
1038 schedulability: schedulability.into(),
1039 };
1040 let resp = self
1041 .inner
1042 .update_worker_node_schedulability(request)
1043 .await?;
1044 Ok(resp)
1045 }
1046
1047 pub async fn list_worker_nodes(
1048 &self,
1049 worker_type: Option<WorkerType>,
1050 ) -> Result<Vec<WorkerNode>> {
1051 let request = ListAllNodesRequest {
1052 worker_type: worker_type.map(Into::into),
1053 include_starting_nodes: true,
1054 };
1055 let resp = self.inner.list_all_nodes(request).await?;
1056 Ok(resp.nodes)
1057 }
1058
1059 pub fn start_heartbeat_loop(
1061 meta_client: MetaClient,
1062 min_interval: Duration,
1063 ) -> (JoinHandle<()>, Sender<()>) {
1064 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1065 let join_handle = tokio::spawn(async move {
1066 let mut min_interval_ticker = tokio::time::interval(min_interval);
1067 loop {
1068 tokio::select! {
1069 biased;
1070 _ = &mut shutdown_rx => {
1072 tracing::info!("Heartbeat loop is stopped");
1073 return;
1074 }
1075 _ = min_interval_ticker.tick() => {},
1077 }
1078 tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
1079 match tokio::time::timeout(
1080 min_interval * 3,
1082 meta_client.send_heartbeat(meta_client.worker_id()),
1083 )
1084 .await
1085 {
1086 Ok(Ok(_)) => {}
1087 Ok(Err(err)) => {
1088 tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
1089 }
1090 Err(_) => {
1091 tracing::warn!("Failed to send_heartbeat: timeout");
1092 }
1093 }
1094 }
1095 });
1096 (join_handle, shutdown_tx)
1097 }
1098
1099 pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
1100 let request = RisectlListStateTablesRequest {};
1101 let resp = self.inner.risectl_list_state_tables(request).await?;
1102 Ok(resp.tables)
1103 }
1104
1105 pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
1106 let request = FlushRequest { database_id };
1107 let resp = self.inner.flush(request).await?;
1108 Ok(HummockVersionId::new(resp.hummock_version_id))
1109 }
1110
1111 pub async fn wait(&self) -> Result<()> {
1112 let request = WaitRequest {};
1113 self.inner.wait(request).await?;
1114 Ok(())
1115 }
1116
1117 pub async fn recover(&self) -> Result<()> {
1118 let request = RecoverRequest {};
1119 self.inner.recover(request).await?;
1120 Ok(())
1121 }
1122
1123 pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
1124 let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
1125 let resp = self.inner.cancel_creating_jobs(request).await?;
1126 Ok(resp.canceled_jobs)
1127 }
1128
1129 pub async fn list_table_fragments(
1130 &self,
1131 job_ids: &[JobId],
1132 ) -> Result<HashMap<JobId, TableFragmentInfo>> {
1133 let request = ListTableFragmentsRequest {
1134 table_ids: job_ids.to_vec(),
1135 };
1136 let resp = self.inner.list_table_fragments(request).await?;
1137 Ok(resp.table_fragments)
1138 }
1139
1140 pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1141 let resp = self
1142 .inner
1143 .list_streaming_job_states(ListStreamingJobStatesRequest {})
1144 .await?;
1145 Ok(resp.states)
1146 }
1147
1148 pub async fn list_fragment_distributions(&self) -> Result<Vec<FragmentDistribution>> {
1149 let resp = self
1150 .inner
1151 .list_fragment_distribution(ListFragmentDistributionRequest {})
1152 .await?;
1153 Ok(resp.distributions)
1154 }
1155
1156 pub async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
1157 let resp = self
1158 .inner
1159 .list_creating_fragment_distribution(ListCreatingFragmentDistributionRequest {})
1160 .await?;
1161 Ok(resp.distributions)
1162 }
1163
1164 pub async fn get_fragment_by_id(
1165 &self,
1166 fragment_id: FragmentId,
1167 ) -> Result<Option<FragmentDistribution>> {
1168 let resp = self
1169 .inner
1170 .get_fragment_by_id(GetFragmentByIdRequest { fragment_id })
1171 .await?;
1172 Ok(resp.distribution)
1173 }
1174
1175 pub async fn get_fragment_vnodes(
1176 &self,
1177 fragment_id: FragmentId,
1178 ) -> Result<Vec<(ActorId, Vec<u32>)>> {
1179 let resp = self
1180 .inner
1181 .get_fragment_vnodes(GetFragmentVnodesRequest { fragment_id })
1182 .await?;
1183 Ok(resp
1184 .actor_vnodes
1185 .into_iter()
1186 .map(|actor| (actor.actor_id, actor.vnode_indices))
1187 .collect())
1188 }
1189
1190 pub async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>> {
1191 let resp = self
1192 .inner
1193 .get_actor_vnodes(GetActorVnodesRequest { actor_id })
1194 .await?;
1195 Ok(resp.vnode_indices)
1196 }
1197
1198 pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1199 let resp = self
1200 .inner
1201 .list_actor_states(ListActorStatesRequest {})
1202 .await?;
1203 Ok(resp.states)
1204 }
1205
1206 pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1207 let resp = self
1208 .inner
1209 .list_actor_splits(ListActorSplitsRequest {})
1210 .await?;
1211
1212 Ok(resp.actor_splits)
1213 }
1214
1215 pub async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
1216 let resp = self
1217 .inner
1218 .list_object_dependencies(ListObjectDependenciesRequest {})
1219 .await?;
1220 Ok(resp.dependencies)
1221 }
1222
1223 pub async fn pause(&self) -> Result<PauseResponse> {
1224 let request = PauseRequest {};
1225 let resp = self.inner.pause(request).await?;
1226 Ok(resp)
1227 }
1228
1229 pub async fn resume(&self) -> Result<ResumeResponse> {
1230 let request = ResumeRequest {};
1231 let resp = self.inner.resume(request).await?;
1232 Ok(resp)
1233 }
1234
1235 pub async fn apply_throttle(
1236 &self,
1237 kind: PbThrottleTarget,
1238 id: u32,
1239 rate: Option<u32>,
1240 ) -> Result<ApplyThrottleResponse> {
1241 let request = ApplyThrottleRequest {
1242 kind: kind as i32,
1243 id,
1244 rate,
1245 };
1246 let resp = self.inner.apply_throttle(request).await?;
1247 Ok(resp)
1248 }
1249
1250 pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1251 let resp = self
1252 .inner
1253 .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1254 .await?;
1255 Ok(resp.get_status().unwrap())
1256 }
1257
1258 pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1259 let request = GetClusterInfoRequest {};
1260 let resp = self.inner.get_cluster_info(request).await?;
1261 Ok(resp)
1262 }
1263
1264 pub async fn reschedule(
1265 &self,
1266 worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1267 revision: u64,
1268 resolve_no_shuffle_upstream: bool,
1269 ) -> Result<(bool, u64)> {
1270 let request = RescheduleRequest {
1271 revision,
1272 resolve_no_shuffle_upstream,
1273 worker_reschedules,
1274 };
1275 let resp = self.inner.reschedule(request).await?;
1276 Ok((resp.success, resp.revision))
1277 }
1278
1279 pub async fn risectl_get_pinned_versions_summary(
1280 &self,
1281 ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1282 let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1283 self.inner
1284 .rise_ctl_get_pinned_versions_summary(request)
1285 .await
1286 }
1287
1288 pub async fn risectl_get_checkpoint_hummock_version(
1289 &self,
1290 ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1291 let request = RiseCtlGetCheckpointVersionRequest {};
1292 self.inner.rise_ctl_get_checkpoint_version(request).await
1293 }
1294
1295 pub async fn risectl_pause_hummock_version_checkpoint(
1296 &self,
1297 ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1298 let request = RiseCtlPauseVersionCheckpointRequest {};
1299 self.inner.rise_ctl_pause_version_checkpoint(request).await
1300 }
1301
1302 pub async fn risectl_resume_hummock_version_checkpoint(
1303 &self,
1304 ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1305 let request = RiseCtlResumeVersionCheckpointRequest {};
1306 self.inner.rise_ctl_resume_version_checkpoint(request).await
1307 }
1308
1309 pub async fn init_metadata_for_replay(
1310 &self,
1311 tables: Vec<PbTable>,
1312 compaction_groups: Vec<CompactionGroupInfo>,
1313 ) -> Result<()> {
1314 let req = InitMetadataForReplayRequest {
1315 tables,
1316 compaction_groups,
1317 };
1318 let _resp = self.inner.init_metadata_for_replay(req).await?;
1319 Ok(())
1320 }
1321
1322 pub async fn replay_version_delta(
1323 &self,
1324 version_delta: HummockVersionDelta,
1325 ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1326 let req = ReplayVersionDeltaRequest {
1327 version_delta: Some(version_delta.into()),
1328 };
1329 let resp = self.inner.replay_version_delta(req).await?;
1330 Ok((
1331 HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1332 resp.modified_compaction_groups,
1333 ))
1334 }
1335
1336 pub async fn list_version_deltas(
1337 &self,
1338 start_id: HummockVersionId,
1339 num_limit: u32,
1340 committed_epoch_limit: HummockEpoch,
1341 ) -> Result<Vec<HummockVersionDelta>> {
1342 let req = ListVersionDeltasRequest {
1343 start_id: start_id.to_u64(),
1344 num_limit,
1345 committed_epoch_limit,
1346 };
1347 Ok(self
1348 .inner
1349 .list_version_deltas(req)
1350 .await?
1351 .version_deltas
1352 .unwrap()
1353 .version_deltas
1354 .iter()
1355 .map(HummockVersionDelta::from_rpc_protobuf)
1356 .collect())
1357 }
1358
1359 pub async fn trigger_compaction_deterministic(
1360 &self,
1361 version_id: HummockVersionId,
1362 compaction_groups: Vec<CompactionGroupId>,
1363 ) -> Result<()> {
1364 let req = TriggerCompactionDeterministicRequest {
1365 version_id: version_id.to_u64(),
1366 compaction_groups,
1367 };
1368 self.inner.trigger_compaction_deterministic(req).await?;
1369 Ok(())
1370 }
1371
1372 pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1373 let req = DisableCommitEpochRequest {};
1374 Ok(HummockVersion::from_rpc_protobuf(
1375 &self
1376 .inner
1377 .disable_commit_epoch(req)
1378 .await?
1379 .current_version
1380 .unwrap(),
1381 ))
1382 }
1383
1384 pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1385 let req = GetAssignedCompactTaskNumRequest {};
1386 let resp = self.inner.get_assigned_compact_task_num(req).await?;
1387 Ok(resp.num_tasks as usize)
1388 }
1389
1390 pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1391 let req = RiseCtlListCompactionGroupRequest {};
1392 let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1393 Ok(resp.compaction_groups)
1394 }
1395
1396 pub async fn risectl_update_compaction_config(
1397 &self,
1398 compaction_groups: &[CompactionGroupId],
1399 configs: &[MutableConfig],
1400 ) -> Result<()> {
1401 let req = RiseCtlUpdateCompactionConfigRequest {
1402 compaction_group_ids: compaction_groups.to_vec(),
1403 configs: configs
1404 .iter()
1405 .map(
1406 |c| rise_ctl_update_compaction_config_request::MutableConfig {
1407 mutable_config: Some(c.clone()),
1408 },
1409 )
1410 .collect(),
1411 };
1412 let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1413 Ok(())
1414 }
1415
1416 pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1417 let req = BackupMetaRequest { remarks };
1418 let resp = self.inner.backup_meta(req).await?;
1419 Ok(resp.job_id)
1420 }
1421
1422 pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1423 let req = GetBackupJobStatusRequest { job_id };
1424 let resp = self.inner.get_backup_job_status(req).await?;
1425 Ok((resp.job_status(), resp.message))
1426 }
1427
1428 pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1429 let req = DeleteMetaSnapshotRequest {
1430 snapshot_ids: snapshot_ids.to_vec(),
1431 };
1432 let _resp = self.inner.delete_meta_snapshot(req).await?;
1433 Ok(())
1434 }
1435
1436 pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1437 let req = GetMetaSnapshotManifestRequest {};
1438 let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1439 Ok(resp.manifest.expect("should exist"))
1440 }
1441
1442 pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1443 let req = GetTelemetryInfoRequest {};
1444 let resp = self.inner.get_telemetry_info(req).await?;
1445 Ok(resp)
1446 }
1447
1448 pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1449 let req = GetMetaStoreInfoRequest {};
1450 let resp = self.inner.get_meta_store_info(req).await?;
1451 Ok(resp.meta_store_endpoint)
1452 }
1453
1454 pub async fn alter_sink_props(
1455 &self,
1456 sink_id: SinkId,
1457 changed_props: BTreeMap<String, String>,
1458 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1459 connector_conn_ref: Option<ConnectionId>,
1460 ) -> Result<()> {
1461 let req = AlterConnectorPropsRequest {
1462 object_id: sink_id.as_raw_id(),
1463 changed_props: changed_props.into_iter().collect(),
1464 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1465 connector_conn_ref,
1466 object_type: AlterConnectorPropsObject::Sink as i32,
1467 extra_options: None,
1468 };
1469 let _resp = self.inner.alter_connector_props(req).await?;
1470 Ok(())
1471 }
1472
1473 pub async fn alter_iceberg_table_props(
1474 &self,
1475 table_id: TableId,
1476 sink_id: SinkId,
1477 source_id: SourceId,
1478 changed_props: BTreeMap<String, String>,
1479 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1480 connector_conn_ref: Option<ConnectionId>,
1481 ) -> Result<()> {
1482 let req = AlterConnectorPropsRequest {
1483 object_id: table_id.as_raw_id(),
1484 changed_props: changed_props.into_iter().collect(),
1485 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1486 connector_conn_ref,
1487 object_type: AlterConnectorPropsObject::IcebergTable as i32,
1488 extra_options: Some(ExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds {
1489 sink_id,
1490 source_id,
1491 })),
1492 };
1493 let _resp = self.inner.alter_connector_props(req).await?;
1494 Ok(())
1495 }
1496
1497 pub async fn alter_source_connector_props(
1498 &self,
1499 source_id: SourceId,
1500 changed_props: BTreeMap<String, String>,
1501 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1502 connector_conn_ref: Option<ConnectionId>,
1503 ) -> Result<()> {
1504 let req = AlterConnectorPropsRequest {
1505 object_id: source_id.as_raw_id(),
1506 changed_props: changed_props.into_iter().collect(),
1507 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1508 connector_conn_ref,
1509 object_type: AlterConnectorPropsObject::Source as i32,
1510 extra_options: None,
1511 };
1512 let _resp = self.inner.alter_connector_props(req).await?;
1513 Ok(())
1514 }
1515
1516 pub async fn set_system_param(
1517 &self,
1518 param: String,
1519 value: Option<String>,
1520 ) -> Result<Option<SystemParamsReader>> {
1521 let req = SetSystemParamRequest { param, value };
1522 let resp = self.inner.set_system_param(req).await?;
1523 Ok(resp.params.map(SystemParamsReader::from))
1524 }
1525
1526 pub async fn get_session_params(&self) -> Result<String> {
1527 let req = GetSessionParamsRequest {};
1528 let resp = self.inner.get_session_params(req).await?;
1529 Ok(resp.params)
1530 }
1531
1532 pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1533 let req = SetSessionParamRequest { param, value };
1534 let resp = self.inner.set_session_param(req).await?;
1535 Ok(resp.param)
1536 }
1537
1538 pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1539 let req = GetDdlProgressRequest {};
1540 let resp = self.inner.get_ddl_progress(req).await?;
1541 Ok(resp.ddl_progress)
1542 }
1543
1544 pub async fn split_compaction_group(
1545 &self,
1546 group_id: CompactionGroupId,
1547 table_ids_to_new_group: &[TableId],
1548 partition_vnode_count: u32,
1549 ) -> Result<CompactionGroupId> {
1550 let req = SplitCompactionGroupRequest {
1551 group_id,
1552 table_ids: table_ids_to_new_group.to_vec(),
1553 partition_vnode_count,
1554 };
1555 let resp = self.inner.split_compaction_group(req).await?;
1556 Ok(resp.new_group_id)
1557 }
1558
1559 pub async fn get_tables(
1560 &self,
1561 table_ids: Vec<TableId>,
1562 include_dropped_tables: bool,
1563 ) -> Result<HashMap<TableId, Table>> {
1564 let req = GetTablesRequest {
1565 table_ids,
1566 include_dropped_tables,
1567 };
1568 let resp = self.inner.get_tables(req).await?;
1569 Ok(resp.tables)
1570 }
1571
1572 pub async fn list_serving_vnode_mappings(
1573 &self,
1574 ) -> Result<HashMap<FragmentId, (u32, WorkerSlotMapping)>> {
1575 let req = GetServingVnodeMappingsRequest {};
1576 let resp = self.inner.get_serving_vnode_mappings(req).await?;
1577 let mappings = resp
1578 .worker_slot_mappings
1579 .into_iter()
1580 .map(|p| {
1581 (
1582 p.fragment_id,
1583 (
1584 resp.fragment_to_table
1585 .get(&p.fragment_id)
1586 .cloned()
1587 .unwrap_or(0),
1588 WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1589 ),
1590 )
1591 })
1592 .collect();
1593 Ok(mappings)
1594 }
1595
1596 pub async fn risectl_list_compaction_status(
1597 &self,
1598 ) -> Result<(
1599 Vec<CompactStatus>,
1600 Vec<CompactTaskAssignment>,
1601 Vec<CompactTaskProgress>,
1602 )> {
1603 let req = RiseCtlListCompactionStatusRequest {};
1604 let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1605 Ok((
1606 resp.compaction_statuses,
1607 resp.task_assignment,
1608 resp.task_progress,
1609 ))
1610 }
1611
1612 pub async fn get_compaction_score(
1613 &self,
1614 compaction_group_id: CompactionGroupId,
1615 ) -> Result<Vec<PickerInfo>> {
1616 let req = GetCompactionScoreRequest {
1617 compaction_group_id,
1618 };
1619 let resp = self.inner.get_compaction_score(req).await?;
1620 Ok(resp.scores)
1621 }
1622
1623 pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1624 let req = RiseCtlRebuildTableStatsRequest {};
1625 let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1626 Ok(())
1627 }
1628
1629 pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1630 let req = ListBranchedObjectRequest {};
1631 let resp = self.inner.list_branched_object(req).await?;
1632 Ok(resp.branched_objects)
1633 }
1634
1635 pub async fn list_active_write_limit(&self) -> Result<HashMap<u64, WriteLimit>> {
1636 let req = ListActiveWriteLimitRequest {};
1637 let resp = self.inner.list_active_write_limit(req).await?;
1638 Ok(resp.write_limits)
1639 }
1640
1641 pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1642 let req = ListHummockMetaConfigRequest {};
1643 let resp = self.inner.list_hummock_meta_config(req).await?;
1644 Ok(resp.configs)
1645 }
1646
1647 pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1648 let _resp = self
1649 .inner
1650 .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1651 .await?;
1652
1653 Ok(())
1654 }
1655
1656 pub async fn rw_cloud_validate_source(
1657 &self,
1658 source_type: SourceType,
1659 source_config: HashMap<String, String>,
1660 ) -> Result<RwCloudValidateSourceResponse> {
1661 let req = RwCloudValidateSourceRequest {
1662 source_type: source_type.into(),
1663 source_config,
1664 };
1665 let resp = self.inner.rw_cloud_validate_source(req).await?;
1666 Ok(resp)
1667 }
1668
1669 pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1670 self.inner.core.read().await.sink_coordinate_client.clone()
1671 }
1672
1673 pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1674 let req = ListCompactTaskAssignmentRequest {};
1675 let resp = self.inner.list_compact_task_assignment(req).await?;
1676 Ok(resp.task_assignment)
1677 }
1678
1679 pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1680 let req = ListEventLogRequest::default();
1681 let resp = self.inner.list_event_log(req).await?;
1682 Ok(resp.event_logs)
1683 }
1684
1685 pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1686 let req = ListCompactTaskProgressRequest {};
1687 let resp = self.inner.list_compact_task_progress(req).await?;
1688 Ok(resp.task_progress)
1689 }
1690
1691 #[cfg(madsim)]
1692 pub fn try_add_panic_event_blocking(
1693 &self,
1694 panic_info: impl Display,
1695 timeout_millis: Option<u64>,
1696 ) {
1697 }
1698
1699 #[cfg(not(madsim))]
1701 pub fn try_add_panic_event_blocking(
1702 &self,
1703 panic_info: impl Display,
1704 timeout_millis: Option<u64>,
1705 ) {
1706 let event = event_log::EventWorkerNodePanic {
1707 worker_id: self.worker_id,
1708 worker_type: self.worker_type.into(),
1709 host_addr: Some(self.host_addr.to_protobuf()),
1710 panic_info: format!("{panic_info}"),
1711 };
1712 let grpc_meta_client = self.inner.clone();
1713 let _ = thread::spawn(move || {
1714 let rt = tokio::runtime::Builder::new_current_thread()
1715 .enable_all()
1716 .build()
1717 .unwrap();
1718 let req = AddEventLogRequest {
1719 event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1720 };
1721 rt.block_on(async {
1722 let _ = tokio::time::timeout(
1723 Duration::from_millis(timeout_millis.unwrap_or(1000)),
1724 grpc_meta_client.add_event_log(req),
1725 )
1726 .await;
1727 });
1728 })
1729 .join();
1730 }
1731
1732 pub async fn add_sink_fail_evet(
1733 &self,
1734 sink_id: SinkId,
1735 sink_name: String,
1736 connector: String,
1737 error: String,
1738 ) -> Result<()> {
1739 let event = event_log::EventSinkFail {
1740 sink_id,
1741 sink_name,
1742 connector,
1743 error,
1744 };
1745 let req = AddEventLogRequest {
1746 event: Some(add_event_log_request::Event::SinkFail(event)),
1747 };
1748 self.inner.add_event_log(req).await?;
1749 Ok(())
1750 }
1751
1752 pub async fn add_cdc_auto_schema_change_fail_event(
1753 &self,
1754 source_id: SourceId,
1755 table_name: String,
1756 cdc_table_id: String,
1757 upstream_ddl: String,
1758 fail_info: String,
1759 ) -> Result<()> {
1760 let event = event_log::EventAutoSchemaChangeFail {
1761 table_id: source_id.as_cdc_table_id(),
1762 table_name,
1763 cdc_table_id,
1764 upstream_ddl,
1765 fail_info,
1766 };
1767 let req = AddEventLogRequest {
1768 event: Some(add_event_log_request::Event::AutoSchemaChangeFail(event)),
1769 };
1770 self.inner.add_event_log(req).await?;
1771 Ok(())
1772 }
1773
1774 pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1775 let req = CancelCompactTaskRequest {
1776 task_id,
1777 task_status: task_status as _,
1778 };
1779 let resp = self.inner.cancel_compact_task(req).await?;
1780 Ok(resp.ret)
1781 }
1782
1783 pub async fn get_version_by_epoch(
1784 &self,
1785 epoch: HummockEpoch,
1786 table_id: TableId,
1787 ) -> Result<PbHummockVersion> {
1788 let req = GetVersionByEpochRequest { epoch, table_id };
1789 let resp = self.inner.get_version_by_epoch(req).await?;
1790 Ok(resp.version.unwrap())
1791 }
1792
1793 pub async fn get_cluster_limits(
1794 &self,
1795 ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1796 let req = GetClusterLimitsRequest {};
1797 let resp = self.inner.get_cluster_limits(req).await?;
1798 Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1799 }
1800
1801 pub async fn merge_compaction_group(
1802 &self,
1803 left_group_id: CompactionGroupId,
1804 right_group_id: CompactionGroupId,
1805 ) -> Result<()> {
1806 let req = MergeCompactionGroupRequest {
1807 left_group_id,
1808 right_group_id,
1809 };
1810 self.inner.merge_compaction_group(req).await?;
1811 Ok(())
1812 }
1813
1814 pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1816 let request = ListRateLimitsRequest {};
1817 let resp = self.inner.list_rate_limits(request).await?;
1818 Ok(resp.rate_limits)
1819 }
1820
1821 pub async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
1822 let request = ListCdcProgressRequest {};
1823 let resp = self.inner.list_cdc_progress(request).await?;
1824 Ok(resp.cdc_progress)
1825 }
1826
1827 pub async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>> {
1828 let request = ListRefreshTableStatesRequest {};
1829 let resp = self.inner.list_refresh_table_states(request).await?;
1830 Ok(resp.states)
1831 }
1832
1833 pub async fn create_iceberg_table(
1834 &self,
1835 table_job_info: PbTableJobInfo,
1836 sink_job_info: PbSinkJobInfo,
1837 iceberg_source: PbSource,
1838 if_not_exists: bool,
1839 ) -> Result<WaitVersion> {
1840 let request = CreateIcebergTableRequest {
1841 table_info: Some(table_job_info),
1842 sink_info: Some(sink_job_info),
1843 iceberg_source: Some(iceberg_source),
1844 if_not_exists,
1845 };
1846
1847 let resp = self.inner.create_iceberg_table(request).await?;
1848 Ok(resp
1849 .version
1850 .ok_or_else(|| anyhow!("wait version not set"))?)
1851 }
1852
1853 pub async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
1854 let request = ListIcebergTablesRequest {};
1855 let resp = self.inner.list_iceberg_tables(request).await?;
1856 Ok(resp.iceberg_tables)
1857 }
1858
1859 pub async fn get_cluster_stack_trace(
1861 &self,
1862 actor_traces_format: ActorTracesFormat,
1863 ) -> Result<StackTraceResponse> {
1864 let request = StackTraceRequest {
1865 actor_traces_format: actor_traces_format as i32,
1866 };
1867 let resp = self.inner.stack_trace(request).await?;
1868 Ok(resp)
1869 }
1870
1871 pub async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
1872 let request = SetSyncLogStoreAlignedRequest { job_id, aligned };
1873 self.inner.set_sync_log_store_aligned(request).await?;
1874 Ok(())
1875 }
1876
1877 pub async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
1878 self.inner.refresh(request).await
1879 }
1880
1881 pub async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
1882 let request = ListUnmigratedTablesRequest {};
1883 let resp = self.inner.list_unmigrated_tables(request).await?;
1884 Ok(resp
1885 .tables
1886 .into_iter()
1887 .map(|table| (table.table_id, table.table_name))
1888 .collect())
1889 }
1890}
1891
1892#[async_trait]
1893impl HummockMetaClient for MetaClient {
1894 async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
1895 let req = UnpinVersionBeforeRequest {
1896 context_id: self.worker_id(),
1897 unpin_version_before: unpin_version_before.to_u64(),
1898 };
1899 self.inner.unpin_version_before(req).await?;
1900 Ok(())
1901 }
1902
1903 async fn get_current_version(&self) -> Result<HummockVersion> {
1904 let req = GetCurrentVersionRequest::default();
1905 Ok(HummockVersion::from_rpc_protobuf(
1906 &self
1907 .inner
1908 .get_current_version(req)
1909 .await?
1910 .current_version
1911 .unwrap(),
1912 ))
1913 }
1914
1915 async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
1916 let resp = self
1917 .inner
1918 .get_new_object_ids(GetNewObjectIdsRequest { number })
1919 .await?;
1920 Ok(ObjectIdRange::new(resp.start_id, resp.end_id))
1921 }
1922
1923 async fn commit_epoch_with_change_log(
1924 &self,
1925 _epoch: HummockEpoch,
1926 _sync_result: SyncResult,
1927 _change_log_info: Option<HummockMetaClientChangeLogInfo>,
1928 ) -> Result<()> {
1929 panic!("Only meta service can commit_epoch in production.")
1930 }
1931
1932 async fn trigger_manual_compaction(
1933 &self,
1934 compaction_group_id: u64,
1935 table_id: JobId,
1936 level: u32,
1937 sst_ids: Vec<u64>,
1938 ) -> Result<()> {
1939 let req = TriggerManualCompactionRequest {
1941 compaction_group_id,
1942 table_id,
1943 level,
1946 sst_ids,
1947 ..Default::default()
1948 };
1949
1950 self.inner.trigger_manual_compaction(req).await?;
1951 Ok(())
1952 }
1953
1954 async fn trigger_full_gc(
1955 &self,
1956 sst_retention_time_sec: u64,
1957 prefix: Option<String>,
1958 ) -> Result<()> {
1959 self.inner
1960 .trigger_full_gc(TriggerFullGcRequest {
1961 sst_retention_time_sec,
1962 prefix,
1963 })
1964 .await?;
1965 Ok(())
1966 }
1967
1968 async fn subscribe_compaction_event(
1969 &self,
1970 ) -> Result<(
1971 UnboundedSender<SubscribeCompactionEventRequest>,
1972 BoxStream<'static, CompactionEventItem>,
1973 )> {
1974 let (request_sender, request_receiver) =
1975 unbounded_channel::<SubscribeCompactionEventRequest>();
1976 request_sender
1977 .send(SubscribeCompactionEventRequest {
1978 event: Some(subscribe_compaction_event_request::Event::Register(
1979 Register {
1980 context_id: self.worker_id(),
1981 },
1982 )),
1983 create_at: SystemTime::now()
1984 .duration_since(SystemTime::UNIX_EPOCH)
1985 .expect("Clock may have gone backwards")
1986 .as_millis() as u64,
1987 })
1988 .context("Failed to subscribe compaction event")?;
1989
1990 let stream = self
1991 .inner
1992 .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
1993 request_receiver,
1994 )))
1995 .await?;
1996
1997 Ok((request_sender, Box::pin(stream)))
1998 }
1999
2000 async fn get_version_by_epoch(
2001 &self,
2002 epoch: HummockEpoch,
2003 table_id: TableId,
2004 ) -> Result<PbHummockVersion> {
2005 self.get_version_by_epoch(epoch, table_id).await
2006 }
2007
2008 async fn subscribe_iceberg_compaction_event(
2009 &self,
2010 ) -> Result<(
2011 UnboundedSender<SubscribeIcebergCompactionEventRequest>,
2012 BoxStream<'static, IcebergCompactionEventItem>,
2013 )> {
2014 let (request_sender, request_receiver) =
2015 unbounded_channel::<SubscribeIcebergCompactionEventRequest>();
2016 request_sender
2017 .send(SubscribeIcebergCompactionEventRequest {
2018 event: Some(subscribe_iceberg_compaction_event_request::Event::Register(
2019 IcebergRegister {
2020 context_id: self.worker_id(),
2021 },
2022 )),
2023 create_at: SystemTime::now()
2024 .duration_since(SystemTime::UNIX_EPOCH)
2025 .expect("Clock may have gone backwards")
2026 .as_millis() as u64,
2027 })
2028 .context("Failed to subscribe compaction event")?;
2029
2030 let stream = self
2031 .inner
2032 .subscribe_iceberg_compaction_event(Request::new(UnboundedReceiverStream::new(
2033 request_receiver,
2034 )))
2035 .await?;
2036
2037 Ok((request_sender, Box::pin(stream)))
2038 }
2039}
2040
2041#[async_trait]
2042impl TelemetryInfoFetcher for MetaClient {
2043 async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
2044 let resp = self
2045 .get_telemetry_info()
2046 .await
2047 .map_err(|e| e.to_report_string())?;
2048 let tracking_id = resp.get_tracking_id().ok();
2049 Ok(tracking_id.map(|id| id.to_owned()))
2050 }
2051}
2052
2053pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
2054
2055#[derive(Debug, Clone)]
2056struct GrpcMetaClientCore {
2057 cluster_client: ClusterServiceClient<Channel>,
2058 meta_member_client: MetaMemberServiceClient<Channel>,
2059 heartbeat_client: HeartbeatServiceClient<Channel>,
2060 ddl_client: DdlServiceClient<Channel>,
2061 hummock_client: HummockManagerServiceClient<Channel>,
2062 notification_client: NotificationServiceClient<Channel>,
2063 stream_client: StreamManagerServiceClient<Channel>,
2064 user_client: UserServiceClient<Channel>,
2065 scale_client: ScaleServiceClient<Channel>,
2066 backup_client: BackupServiceClient<Channel>,
2067 telemetry_client: TelemetryInfoServiceClient<Channel>,
2068 system_params_client: SystemParamsServiceClient<Channel>,
2069 session_params_client: SessionParamServiceClient<Channel>,
2070 serving_client: ServingServiceClient<Channel>,
2071 cloud_client: CloudServiceClient<Channel>,
2072 sink_coordinate_client: SinkCoordinationRpcClient,
2073 event_log_client: EventLogServiceClient<Channel>,
2074 cluster_limit_client: ClusterLimitServiceClient<Channel>,
2075 hosted_iceberg_catalog_service_client: HostedIcebergCatalogServiceClient<Channel>,
2076 monitor_client: MonitorServiceClient<Channel>,
2077}
2078
2079impl GrpcMetaClientCore {
2080 pub(crate) fn new(channel: Channel) -> Self {
2081 let cluster_client = ClusterServiceClient::new(channel.clone());
2082 let meta_member_client = MetaMemberClient::new(channel.clone());
2083 let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
2084 let ddl_client =
2085 DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2086 let hummock_client =
2087 HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2088 let notification_client =
2089 NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2090 let stream_client =
2091 StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2092 let user_client = UserServiceClient::new(channel.clone());
2093 let scale_client =
2094 ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2095 let backup_client = BackupServiceClient::new(channel.clone());
2096 let telemetry_client =
2097 TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2098 let system_params_client = SystemParamsServiceClient::new(channel.clone());
2099 let session_params_client = SessionParamServiceClient::new(channel.clone());
2100 let serving_client = ServingServiceClient::new(channel.clone());
2101 let cloud_client = CloudServiceClient::new(channel.clone());
2102 let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone())
2103 .max_decoding_message_size(usize::MAX);
2104 let event_log_client = EventLogServiceClient::new(channel.clone());
2105 let cluster_limit_client = ClusterLimitServiceClient::new(channel.clone());
2106 let hosted_iceberg_catalog_service_client =
2107 HostedIcebergCatalogServiceClient::new(channel.clone());
2108 let monitor_client = MonitorServiceClient::new(channel);
2109
2110 GrpcMetaClientCore {
2111 cluster_client,
2112 meta_member_client,
2113 heartbeat_client,
2114 ddl_client,
2115 hummock_client,
2116 notification_client,
2117 stream_client,
2118 user_client,
2119 scale_client,
2120 backup_client,
2121 telemetry_client,
2122 system_params_client,
2123 session_params_client,
2124 serving_client,
2125 cloud_client,
2126 sink_coordinate_client,
2127 event_log_client,
2128 cluster_limit_client,
2129 hosted_iceberg_catalog_service_client,
2130 monitor_client,
2131 }
2132 }
2133}
2134
2135#[derive(Debug, Clone)]
2139struct GrpcMetaClient {
2140 member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
2141 core: Arc<RwLock<GrpcMetaClientCore>>,
2142}
2143
2144type MetaMemberClient = MetaMemberServiceClient<Channel>;
2145
2146struct MetaMemberGroup {
2147 members: LruCache<http::Uri, Option<MetaMemberClient>>,
2148}
2149
2150struct MetaMemberManagement {
2151 core_ref: Arc<RwLock<GrpcMetaClientCore>>,
2152 members: Either<MetaMemberClient, MetaMemberGroup>,
2153 current_leader: http::Uri,
2154 meta_config: Arc<MetaConfig>,
2155}
2156
2157impl MetaMemberManagement {
2158 const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
2159
2160 fn host_address_to_uri(addr: HostAddress) -> http::Uri {
2161 format!("http://{}:{}", addr.host, addr.port)
2162 .parse()
2163 .unwrap()
2164 }
2165
2166 async fn recreate_core(&self, channel: Channel) {
2167 let mut core = self.core_ref.write().await;
2168 *core = GrpcMetaClientCore::new(channel);
2169 }
2170
2171 async fn refresh_members(&mut self) -> Result<()> {
2172 let leader_addr = match self.members.as_mut() {
2173 Either::Left(client) => {
2174 let resp = client
2175 .to_owned()
2176 .members(MembersRequest {})
2177 .await
2178 .map_err(RpcError::from_meta_status)?;
2179 let resp = resp.into_inner();
2180 resp.members.into_iter().find(|member| member.is_leader)
2181 }
2182 Either::Right(member_group) => {
2183 let mut fetched_members = None;
2184
2185 for (addr, client) in &mut member_group.members {
2186 let members: Result<_> = try {
2187 let mut client = match client {
2188 Some(cached_client) => cached_client.to_owned(),
2189 None => {
2190 let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
2191 let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
2192 .await
2193 .context("failed to create client")?;
2194 let new_client: MetaMemberClient =
2195 MetaMemberServiceClient::new(channel);
2196 *client = Some(new_client.clone());
2197
2198 new_client
2199 }
2200 };
2201
2202 let resp = client
2203 .members(MembersRequest {})
2204 .await
2205 .context("failed to fetch members")?;
2206
2207 resp.into_inner().members
2208 };
2209
2210 let fetched = members.is_ok();
2211 fetched_members = Some(members);
2212 if fetched {
2213 break;
2214 }
2215 }
2216
2217 let members = fetched_members
2218 .context("no member available in the list")?
2219 .context("could not refresh members")?;
2220
2221 let mut leader = None;
2223 for member in members {
2224 if member.is_leader {
2225 leader = Some(member.clone());
2226 }
2227
2228 let addr = Self::host_address_to_uri(member.address.unwrap());
2229 if !member_group.members.contains(&addr) {
2231 tracing::info!("new meta member joined: {}", addr);
2232 member_group.members.put(addr, None);
2233 }
2234 }
2235
2236 leader
2237 }
2238 };
2239
2240 if let Some(leader) = leader_addr {
2241 let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
2242
2243 if discovered_leader != self.current_leader {
2244 tracing::info!("new meta leader {} discovered", discovered_leader);
2245
2246 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
2247 Duration::from_secs(self.meta_config.meta_leader_lease_secs),
2248 false,
2249 );
2250
2251 let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
2252 let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
2253 GrpcMetaClient::connect_to_endpoint(endpoint).await
2254 })
2255 .await?;
2256
2257 self.recreate_core(channel).await;
2258 self.current_leader = discovered_leader;
2259 }
2260 }
2261
2262 Ok(())
2263 }
2264}
2265
2266impl GrpcMetaClient {
2267 const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
2269 const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
2271 const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
2273 const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
2275
2276 fn start_meta_member_monitor(
2277 &self,
2278 init_leader_addr: http::Uri,
2279 members: Either<MetaMemberClient, MetaMemberGroup>,
2280 force_refresh_receiver: Receiver<Sender<Result<()>>>,
2281 meta_config: Arc<MetaConfig>,
2282 ) -> Result<()> {
2283 let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
2284 let current_leader = init_leader_addr;
2285
2286 let enable_period_tick = matches!(members, Either::Right(_));
2287
2288 let member_management = MetaMemberManagement {
2289 core_ref,
2290 members,
2291 current_leader,
2292 meta_config,
2293 };
2294
2295 let mut force_refresh_receiver = force_refresh_receiver;
2296
2297 tokio::spawn(async move {
2298 let mut member_management = member_management;
2299 let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
2300
2301 loop {
2302 let event: Option<Sender<Result<()>>> = if enable_period_tick {
2303 tokio::select! {
2304 _ = ticker.tick() => None,
2305 result_sender = force_refresh_receiver.recv() => {
2306 if result_sender.is_none() {
2307 break;
2308 }
2309
2310 result_sender
2311 },
2312 }
2313 } else {
2314 let result_sender = force_refresh_receiver.recv().await;
2315
2316 if result_sender.is_none() {
2317 break;
2318 }
2319
2320 result_sender
2321 };
2322
2323 let tick_result = member_management.refresh_members().await;
2324 if let Err(e) = tick_result.as_ref() {
2325 tracing::warn!(error = %e.as_report(), "refresh meta member client failed");
2326 }
2327
2328 if let Some(sender) = event {
2329 let _resp = sender.send(tick_result);
2331 }
2332 }
2333 });
2334
2335 Ok(())
2336 }
2337
2338 async fn force_refresh_leader(&self) -> Result<()> {
2339 let (sender, receiver) = oneshot::channel();
2340
2341 self.member_monitor_event_sender
2342 .send(sender)
2343 .await
2344 .map_err(|e| anyhow!(e))?;
2345
2346 receiver.await.map_err(|e| anyhow!(e))?
2347 }
2348
2349 pub async fn new(strategy: &MetaAddressStrategy, config: Arc<MetaConfig>) -> Result<Self> {
2351 let (channel, addr) = match strategy {
2352 MetaAddressStrategy::LoadBalance(addr) => {
2353 Self::try_build_rpc_channel(vec![addr.clone()]).await
2354 }
2355 MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
2356 }?;
2357 let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
2358 let client = GrpcMetaClient {
2359 member_monitor_event_sender: force_refresh_sender,
2360 core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
2361 };
2362
2363 let meta_member_client = client.core.read().await.meta_member_client.clone();
2364 let members = match strategy {
2365 MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
2366 MetaAddressStrategy::List(addrs) => {
2367 let mut members = LruCache::new(20.try_into().unwrap());
2368 for addr in addrs {
2369 members.put(addr.clone(), None);
2370 }
2371 members.put(addr.clone(), Some(meta_member_client));
2372
2373 Either::Right(MetaMemberGroup { members })
2374 }
2375 };
2376
2377 client.start_meta_member_monitor(addr, members, force_refresh_receiver, config.clone())?;
2378
2379 client.force_refresh_leader().await?;
2380
2381 Ok(client)
2382 }
2383
2384 fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2385 Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2386 }
2387
2388 pub(crate) async fn try_build_rpc_channel(
2389 addrs: impl IntoIterator<Item = http::Uri>,
2390 ) -> Result<(Channel, http::Uri)> {
2391 let endpoints: Vec<_> = addrs
2392 .into_iter()
2393 .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2394 .collect();
2395
2396 let mut last_error = None;
2397
2398 for (endpoint, addr) in endpoints {
2399 match Self::connect_to_endpoint(endpoint).await {
2400 Ok(channel) => {
2401 tracing::info!("Connect to meta server {} successfully", addr);
2402 return Ok((channel, addr));
2403 }
2404 Err(e) => {
2405 tracing::warn!(
2406 error = %e.as_report(),
2407 "Failed to connect to meta server {}, trying again",
2408 addr,
2409 );
2410 last_error = Some(e);
2411 }
2412 }
2413 }
2414
2415 if let Some(last_error) = last_error {
2416 Err(anyhow::anyhow!(last_error)
2417 .context("failed to connect to all meta servers")
2418 .into())
2419 } else {
2420 bail!("no meta server address provided")
2421 }
2422 }
2423
2424 async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2425 let channel = endpoint
2426 .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2427 .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2428 .connect_timeout(Duration::from_secs(5))
2429 .monitored_connect("grpc-meta-client", Default::default())
2430 .await?
2431 .wrapped();
2432
2433 Ok(channel)
2434 }
2435
2436 pub(crate) fn retry_strategy_to_bound(
2437 high_bound: Duration,
2438 exceed: bool,
2439 ) -> impl Iterator<Item = Duration> {
2440 let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2441 .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2442 .map(jitter);
2443
2444 let mut sum = Duration::default();
2445
2446 iter.take_while(move |duration| {
2447 sum += *duration;
2448
2449 if exceed {
2450 sum < high_bound + *duration
2451 } else {
2452 sum < high_bound
2453 }
2454 })
2455 }
2456}
2457
2458macro_rules! for_all_meta_rpc {
2459 ($macro:ident) => {
2460 $macro! {
2461 { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2462 ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2463 ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2464 ,{ cluster_client, update_worker_node_schedulability, UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse }
2465 ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2466 ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2467 ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2468 ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2469 ,{ stream_client, flush, FlushRequest, FlushResponse }
2470 ,{ stream_client, pause, PauseRequest, PauseResponse }
2471 ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2472 ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2473 ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2474 ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2475 ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2476 ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2477 ,{ stream_client, list_creating_fragment_distribution, ListCreatingFragmentDistributionRequest, ListCreatingFragmentDistributionResponse }
2478 ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2479 ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2480 ,{ stream_client, list_object_dependencies, ListObjectDependenciesRequest, ListObjectDependenciesResponse }
2481 ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2482 ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2483 ,{ stream_client, list_cdc_progress, ListCdcProgressRequest, ListCdcProgressResponse }
2484 ,{ stream_client, list_refresh_table_states, ListRefreshTableStatesRequest, ListRefreshTableStatesResponse }
2485 ,{ stream_client, alter_connector_props, AlterConnectorPropsRequest, AlterConnectorPropsResponse }
2486 ,{ stream_client, get_fragment_by_id, GetFragmentByIdRequest, GetFragmentByIdResponse }
2487 ,{ stream_client, get_fragment_vnodes, GetFragmentVnodesRequest, GetFragmentVnodesResponse }
2488 ,{ stream_client, get_actor_vnodes, GetActorVnodesRequest, GetActorVnodesResponse }
2489 ,{ stream_client, set_sync_log_store_aligned, SetSyncLogStoreAlignedRequest, SetSyncLogStoreAlignedResponse }
2490 ,{ stream_client, refresh, RefreshRequest, RefreshResponse }
2491 ,{ stream_client, list_unmigrated_tables, ListUnmigratedTablesRequest, ListUnmigratedTablesResponse }
2492 ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2493 ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2494 ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2495 ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2496 ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2497 ,{ ddl_client, alter_streaming_job_config, AlterStreamingJobConfigRequest, AlterStreamingJobConfigResponse }
2498 ,{ ddl_client, alter_fragment_parallelism, AlterFragmentParallelismRequest, AlterFragmentParallelismResponse }
2499 ,{ ddl_client, alter_cdc_table_backfill_parallelism, AlterCdcTableBackfillParallelismRequest, AlterCdcTableBackfillParallelismResponse }
2500 ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2501 ,{ ddl_client, alter_database_param, AlterDatabaseParamRequest, AlterDatabaseParamResponse }
2502 ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2503 ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2504 ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2505 ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2506 ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2507 ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2508 ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2509 ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2510 ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2511 ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2512 ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2513 ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2514 ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2515 ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2516 ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2517 ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2518 ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2519 ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2520 ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2521 ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2522 ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2523 ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2524 ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2525 ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2526 ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2527 ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2528 ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2529 ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2530 ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2531 ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2532 ,{ ddl_client, wait, WaitRequest, WaitResponse }
2533 ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2534 ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2535 ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2536 ,{ ddl_client, compact_iceberg_table, CompactIcebergTableRequest, CompactIcebergTableResponse }
2537 ,{ ddl_client, expire_iceberg_table_snapshots, ExpireIcebergTableSnapshotsRequest, ExpireIcebergTableSnapshotsResponse }
2538 ,{ ddl_client, create_iceberg_table, CreateIcebergTableRequest, CreateIcebergTableResponse }
2539 ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2540 ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2541 ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2542 ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2543 ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2544 ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2545 ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2546 ,{ hummock_client, get_new_object_ids, GetNewObjectIdsRequest, GetNewObjectIdsResponse }
2547 ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2548 ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2549 ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2550 ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2551 ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2552 ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2553 ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2554 ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2555 ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2556 ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2557 ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2558 ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2559 ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2560 ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2561 ,{ hummock_client, subscribe_iceberg_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeIcebergCompactionEventRequest>, Streaming<SubscribeIcebergCompactionEventResponse> }
2562 ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2563 ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2564 ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2565 ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2566 ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2567 ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2568 ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2569 ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2570 ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2571 ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2572 ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2573 ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2574 ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2575 ,{ user_client, alter_default_privilege, AlterDefaultPrivilegeRequest, AlterDefaultPrivilegeResponse }
2576 ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2577 ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2578 ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2579 ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2580 ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2581 ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2582 ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2583 ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2584 ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2585 ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2586 ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2587 ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2588 ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2589 ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2590 ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2591 ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2592 ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2593 ,{ hosted_iceberg_catalog_service_client, list_iceberg_tables, ListIcebergTablesRequest, ListIcebergTablesResponse }
2594 ,{ monitor_client, stack_trace, StackTraceRequest, StackTraceResponse }
2595 }
2596 };
2597}
2598
2599impl GrpcMetaClient {
2600 async fn refresh_client_if_needed(&self, code: Code) {
2601 if matches!(
2602 code,
2603 Code::Unknown | Code::Unimplemented | Code::Unavailable
2604 ) {
2605 tracing::debug!("matching tonic code {}", code);
2606 let (result_sender, result_receiver) = oneshot::channel();
2607 if self
2608 .member_monitor_event_sender
2609 .try_send(result_sender)
2610 .is_ok()
2611 {
2612 if let Ok(Err(e)) = result_receiver.await {
2613 tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2614 }
2615 } else {
2616 tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2617 }
2618 }
2619 }
2620}
2621
2622impl GrpcMetaClient {
2623 for_all_meta_rpc! { meta_rpc_client_method_impl }
2624}