1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{
33 AlterDatabaseParam, FunctionId, IndexId, ObjectId, SecretId, TableId,
34};
35use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
36use risingwave_common::hash::WorkerSlotMapping;
37use risingwave_common::id::{
38 ConnectionId, DatabaseId, JobId, SchemaId, SinkId, SubscriptionId, ViewId, WorkerId,
39};
40use risingwave_common::monitor::EndpointExt;
41use risingwave_common::system_param::reader::SystemParamsReader;
42use risingwave_common::telemetry::report::TelemetryInfoFetcher;
43use risingwave_common::util::addr::HostAddr;
44use risingwave_common::util::meta_addr::MetaAddressStrategy;
45use risingwave_common::util::resource_util::cpu::total_cpu_available;
46use risingwave_common::util::resource_util::hostname;
47use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
48use risingwave_error::bail;
49use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
50use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
51use risingwave_hummock_sdk::{
52 CompactionGroupId, HummockEpoch, HummockVersionId, ObjectIdRange, SyncResult,
53};
54use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
55use risingwave_pb::backup_service::*;
56use risingwave_pb::catalog::{
57 Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
58 PbSubscription, PbTable, PbView, Table,
59};
60use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
61use risingwave_pb::cloud_service::*;
62use risingwave_pb::common::worker_node::Property;
63use risingwave_pb::common::{HostAddress, OptionalUint32, OptionalUint64, WorkerNode, WorkerType};
64use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
65use risingwave_pb::ddl_service::alter_owner_request::Object;
66use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
67use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
68use risingwave_pb::ddl_service::*;
69use risingwave_pb::hummock::compact_task::TaskStatus;
70use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
71use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
72use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
73use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
74use risingwave_pb::hummock::write_limits::WriteLimit;
75use risingwave_pb::hummock::*;
76use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Register as IcebergRegister;
77use risingwave_pb::iceberg_compaction::{
78 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
79 subscribe_iceberg_compaction_event_request,
80};
81use risingwave_pb::id::{ActorId, FragmentId, SourceId};
82use risingwave_pb::meta::alter_connector_props_request::{
83 AlterConnectorPropsObject, AlterIcebergTableIds, ExtraOptions,
84};
85use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
86use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
87use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
88use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
89use risingwave_pb::meta::hosted_iceberg_catalog_service_client::HostedIcebergCatalogServiceClient;
90use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
91use risingwave_pb::meta::list_actor_states_response::ActorState;
92use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
93use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
94use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
95use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
96use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
97use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
98use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
99use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
100use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
101use risingwave_pb::meta::serving_service_client::ServingServiceClient;
102use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
103use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
104use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
105use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
106use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
107use risingwave_pb::meta::{FragmentDistribution, *};
108use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
109use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
110use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
111use risingwave_pb::secret::PbSecretRef;
112use risingwave_pb::stream_plan::StreamFragmentGraph;
113use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
114use risingwave_pb::user::update_user_request::UpdateField;
115use risingwave_pb::user::user_service_client::UserServiceClient;
116use risingwave_pb::user::*;
117use thiserror_ext::AsReport;
118use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
119use tokio::sync::oneshot::Sender;
120use tokio::sync::{RwLock, mpsc, oneshot};
121use tokio::task::JoinHandle;
122use tokio::time::{self};
123use tokio_retry::strategy::{ExponentialBackoff, jitter};
124use tokio_stream::wrappers::UnboundedReceiverStream;
125use tonic::transport::Endpoint;
126use tonic::{Code, Request, Streaming};
127
128use crate::channel::{Channel, WrappedChannelExt};
129use crate::error::{Result, RpcError};
130use crate::hummock_meta_client::{
131 CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
132 IcebergCompactionEventItem,
133};
134use crate::meta_rpc_client_method_impl;
135
136#[derive(Clone, Debug)]
138pub struct MetaClient {
139 worker_id: WorkerId,
140 worker_type: WorkerType,
141 host_addr: HostAddr,
142 inner: GrpcMetaClient,
143 meta_config: Arc<MetaConfig>,
144 cluster_id: String,
145 shutting_down: Arc<AtomicBool>,
146}
147
148impl MetaClient {
149 pub fn worker_id(&self) -> WorkerId {
150 self.worker_id
151 }
152
153 pub fn host_addr(&self) -> &HostAddr {
154 &self.host_addr
155 }
156
157 pub fn worker_type(&self) -> WorkerType {
158 self.worker_type
159 }
160
161 pub fn cluster_id(&self) -> &str {
162 &self.cluster_id
163 }
164
165 pub async fn subscribe(
167 &self,
168 subscribe_type: SubscribeType,
169 ) -> Result<Streaming<SubscribeResponse>> {
170 let request = SubscribeRequest {
171 subscribe_type: subscribe_type as i32,
172 host: Some(self.host_addr.to_protobuf()),
173 worker_id: self.worker_id(),
174 };
175
176 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
177 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
178 true,
179 );
180
181 tokio_retry::Retry::spawn(retry_strategy, || async {
182 let request = request.clone();
183 self.inner.subscribe(request).await
184 })
185 .await
186 }
187
188 pub async fn create_connection(
189 &self,
190 connection_name: String,
191 database_id: DatabaseId,
192 schema_id: SchemaId,
193 owner_id: u32,
194 req: create_connection_request::Payload,
195 ) -> Result<WaitVersion> {
196 let request = CreateConnectionRequest {
197 name: connection_name,
198 database_id,
199 schema_id,
200 owner_id,
201 payload: Some(req),
202 };
203 let resp = self.inner.create_connection(request).await?;
204 Ok(resp
205 .version
206 .ok_or_else(|| anyhow!("wait version not set"))?)
207 }
208
209 pub async fn create_secret(
210 &self,
211 secret_name: String,
212 database_id: DatabaseId,
213 schema_id: SchemaId,
214 owner_id: u32,
215 value: Vec<u8>,
216 ) -> Result<WaitVersion> {
217 let request = CreateSecretRequest {
218 name: secret_name,
219 database_id,
220 schema_id,
221 owner_id,
222 value,
223 };
224 let resp = self.inner.create_secret(request).await?;
225 Ok(resp
226 .version
227 .ok_or_else(|| anyhow!("wait version not set"))?)
228 }
229
230 pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
231 let request = ListConnectionsRequest {};
232 let resp = self.inner.list_connections(request).await?;
233 Ok(resp.connections)
234 }
235
236 pub async fn drop_connection(
237 &self,
238 connection_id: ConnectionId,
239 cascade: bool,
240 ) -> Result<WaitVersion> {
241 let request = DropConnectionRequest {
242 connection_id,
243 cascade,
244 };
245 let resp = self.inner.drop_connection(request).await?;
246 Ok(resp
247 .version
248 .ok_or_else(|| anyhow!("wait version not set"))?)
249 }
250
251 pub async fn drop_secret(&self, secret_id: SecretId) -> Result<WaitVersion> {
252 let request = DropSecretRequest { secret_id };
253 let resp = self.inner.drop_secret(request).await?;
254 Ok(resp
255 .version
256 .ok_or_else(|| anyhow!("wait version not set"))?)
257 }
258
259 pub async fn register_new(
263 addr_strategy: MetaAddressStrategy,
264 worker_type: WorkerType,
265 addr: &HostAddr,
266 property: Property,
267 meta_config: Arc<MetaConfig>,
268 ) -> (Self, SystemParamsReader) {
269 let ret =
270 Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
271
272 match ret {
273 Ok(ret) => ret,
274 Err(err) => {
275 tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
276 std::process::exit(1);
277 }
278 }
279 }
280
281 async fn register_new_inner(
282 addr_strategy: MetaAddressStrategy,
283 worker_type: WorkerType,
284 addr: &HostAddr,
285 property: Property,
286 meta_config: Arc<MetaConfig>,
287 ) -> Result<(Self, SystemParamsReader)> {
288 tracing::info!("register meta client using strategy: {}", addr_strategy);
289
290 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
292 Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
293 true,
294 );
295
296 if property.is_unschedulable {
297 tracing::warn!("worker {:?} registered as unschedulable", addr.clone());
298 }
299 let init_result: Result<_> = tokio_retry::RetryIf::spawn(
300 retry_strategy,
301 || async {
302 let grpc_meta_client =
303 GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
304
305 let add_worker_resp = grpc_meta_client
306 .add_worker_node(AddWorkerNodeRequest {
307 worker_type: worker_type as i32,
308 host: Some(addr.to_protobuf()),
309 property: Some(property.clone()),
310 resource: Some(risingwave_pb::common::worker_node::Resource {
311 rw_version: RW_VERSION.to_owned(),
312 total_memory_bytes: system_memory_available_bytes() as _,
313 total_cpu_cores: total_cpu_available() as _,
314 hostname: hostname(),
315 }),
316 })
317 .await
318 .context("failed to add worker node")?;
319
320 let system_params_resp = grpc_meta_client
321 .get_system_params(GetSystemParamsRequest {})
322 .await
323 .context("failed to get initial system params")?;
324
325 Ok((add_worker_resp, system_params_resp, grpc_meta_client))
326 },
327 |e: &RpcError| !e.is_from_tonic_server_impl(),
330 )
331 .await;
332
333 let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
334 let worker_id = add_worker_resp
335 .node_id
336 .expect("AddWorkerNodeResponse::node_id is empty");
337
338 let meta_client = Self {
339 worker_id,
340 worker_type,
341 host_addr: addr.clone(),
342 inner: grpc_meta_client,
343 meta_config: meta_config.clone(),
344 cluster_id: add_worker_resp.cluster_id,
345 shutting_down: Arc::new(false.into()),
346 };
347
348 static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
349 REPORT_PANIC.call_once(|| {
350 let meta_client_clone = meta_client.clone();
351 std::panic::update_hook(move |default_hook, info| {
352 meta_client_clone.try_add_panic_event_blocking(info, None);
354 default_hook(info);
355 });
356 });
357
358 Ok((meta_client, system_params_resp.params.unwrap().into()))
359 }
360
361 pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
363 let request = ActivateWorkerNodeRequest {
364 host: Some(addr.to_protobuf()),
365 node_id: self.worker_id,
366 };
367 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
368 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
369 true,
370 );
371 tokio_retry::Retry::spawn(retry_strategy, || async {
372 let request = request.clone();
373 self.inner.activate_worker_node(request).await
374 })
375 .await?;
376
377 Ok(())
378 }
379
380 pub async fn send_heartbeat(&self, node_id: WorkerId) -> Result<()> {
382 let request = HeartbeatRequest { node_id };
383 let resp = self.inner.heartbeat(request).await?;
384 if let Some(status) = resp.status
385 && status.code() == risingwave_pb::common::status::Code::UnknownWorker
386 {
387 if !self.shutting_down.load(Relaxed) {
390 tracing::error!(message = status.message, "worker expired");
391 std::process::exit(1);
392 }
393 }
394 Ok(())
395 }
396
397 pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
398 let request = CreateDatabaseRequest { db: Some(db) };
399 let resp = self.inner.create_database(request).await?;
400 Ok(resp
402 .version
403 .ok_or_else(|| anyhow!("wait version not set"))?)
404 }
405
406 pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
407 let request = CreateSchemaRequest {
408 schema: Some(schema),
409 };
410 let resp = self.inner.create_schema(request).await?;
411 Ok(resp
413 .version
414 .ok_or_else(|| anyhow!("wait version not set"))?)
415 }
416
417 pub async fn create_materialized_view(
418 &self,
419 table: PbTable,
420 graph: StreamFragmentGraph,
421 dependencies: HashSet<ObjectId>,
422 resource_type: streaming_job_resource_type::ResourceType,
423 if_not_exists: bool,
424 ) -> Result<WaitVersion> {
425 let request = CreateMaterializedViewRequest {
426 materialized_view: Some(table),
427 fragment_graph: Some(graph),
428 resource_type: Some(PbStreamingJobResourceType {
429 resource_type: Some(resource_type),
430 }),
431 dependencies: dependencies.into_iter().collect(),
432 if_not_exists,
433 };
434 let resp = self.inner.create_materialized_view(request).await?;
435 Ok(resp
437 .version
438 .ok_or_else(|| anyhow!("wait version not set"))?)
439 }
440
441 pub async fn drop_materialized_view(
442 &self,
443 table_id: TableId,
444 cascade: bool,
445 ) -> Result<WaitVersion> {
446 let request = DropMaterializedViewRequest { table_id, cascade };
447
448 let resp = self.inner.drop_materialized_view(request).await?;
449 Ok(resp
450 .version
451 .ok_or_else(|| anyhow!("wait version not set"))?)
452 }
453
454 pub async fn create_source(
455 &self,
456 source: PbSource,
457 graph: Option<StreamFragmentGraph>,
458 if_not_exists: bool,
459 ) -> Result<WaitVersion> {
460 let request = CreateSourceRequest {
461 source: Some(source),
462 fragment_graph: graph,
463 if_not_exists,
464 };
465
466 let resp = self.inner.create_source(request).await?;
467 Ok(resp
468 .version
469 .ok_or_else(|| anyhow!("wait version not set"))?)
470 }
471
472 pub async fn create_sink(
473 &self,
474 sink: PbSink,
475 graph: StreamFragmentGraph,
476 dependencies: HashSet<ObjectId>,
477 if_not_exists: bool,
478 ) -> Result<WaitVersion> {
479 let request = CreateSinkRequest {
480 sink: Some(sink),
481 fragment_graph: Some(graph),
482 dependencies: dependencies.into_iter().collect(),
483 if_not_exists,
484 };
485
486 let resp = self.inner.create_sink(request).await?;
487 Ok(resp
488 .version
489 .ok_or_else(|| anyhow!("wait version not set"))?)
490 }
491
492 pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
493 let request = CreateSubscriptionRequest {
494 subscription: Some(subscription),
495 };
496
497 let resp = self.inner.create_subscription(request).await?;
498 Ok(resp
499 .version
500 .ok_or_else(|| anyhow!("wait version not set"))?)
501 }
502
503 pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
504 let request = CreateFunctionRequest {
505 function: Some(function),
506 };
507 let resp = self.inner.create_function(request).await?;
508 Ok(resp
509 .version
510 .ok_or_else(|| anyhow!("wait version not set"))?)
511 }
512
513 pub async fn create_table(
514 &self,
515 source: Option<PbSource>,
516 table: PbTable,
517 graph: StreamFragmentGraph,
518 job_type: PbTableJobType,
519 if_not_exists: bool,
520 dependencies: HashSet<ObjectId>,
521 ) -> Result<WaitVersion> {
522 let request = CreateTableRequest {
523 materialized_view: Some(table),
524 fragment_graph: Some(graph),
525 source,
526 job_type: job_type as _,
527 if_not_exists,
528 dependencies: dependencies.into_iter().collect(),
529 };
530 let resp = self.inner.create_table(request).await?;
531 Ok(resp
533 .version
534 .ok_or_else(|| anyhow!("wait version not set"))?)
535 }
536
537 pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
538 let request = CommentOnRequest {
539 comment: Some(comment),
540 };
541 let resp = self.inner.comment_on(request).await?;
542 Ok(resp
543 .version
544 .ok_or_else(|| anyhow!("wait version not set"))?)
545 }
546
547 pub async fn alter_name(
548 &self,
549 object: alter_name_request::Object,
550 name: &str,
551 ) -> Result<WaitVersion> {
552 let request = AlterNameRequest {
553 object: Some(object),
554 new_name: name.to_owned(),
555 };
556 let resp = self.inner.alter_name(request).await?;
557 Ok(resp
558 .version
559 .ok_or_else(|| anyhow!("wait version not set"))?)
560 }
561
562 pub async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<WaitVersion> {
563 let request = AlterOwnerRequest {
564 object: Some(object),
565 owner_id,
566 };
567 let resp = self.inner.alter_owner(request).await?;
568 Ok(resp
569 .version
570 .ok_or_else(|| anyhow!("wait version not set"))?)
571 }
572
573 pub async fn alter_database_param(
574 &self,
575 database_id: DatabaseId,
576 param: AlterDatabaseParam,
577 ) -> Result<WaitVersion> {
578 let request = match param {
579 AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms) => {
580 let barrier_interval_ms = OptionalUint32 {
581 value: barrier_interval_ms,
582 };
583 AlterDatabaseParamRequest {
584 database_id,
585 param: Some(alter_database_param_request::Param::BarrierIntervalMs(
586 barrier_interval_ms,
587 )),
588 }
589 }
590 AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency) => {
591 let checkpoint_frequency = OptionalUint64 {
592 value: checkpoint_frequency,
593 };
594 AlterDatabaseParamRequest {
595 database_id,
596 param: Some(alter_database_param_request::Param::CheckpointFrequency(
597 checkpoint_frequency,
598 )),
599 }
600 }
601 };
602 let resp = self.inner.alter_database_param(request).await?;
603 Ok(resp
604 .version
605 .ok_or_else(|| anyhow!("wait version not set"))?)
606 }
607
608 pub async fn alter_set_schema(
609 &self,
610 object: alter_set_schema_request::Object,
611 new_schema_id: SchemaId,
612 ) -> Result<WaitVersion> {
613 let request = AlterSetSchemaRequest {
614 new_schema_id,
615 object: Some(object),
616 };
617 let resp = self.inner.alter_set_schema(request).await?;
618 Ok(resp
619 .version
620 .ok_or_else(|| anyhow!("wait version not set"))?)
621 }
622
623 pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
624 let request = AlterSourceRequest {
625 source: Some(source),
626 };
627 let resp = self.inner.alter_source(request).await?;
628 Ok(resp
629 .version
630 .ok_or_else(|| anyhow!("wait version not set"))?)
631 }
632
633 pub async fn alter_parallelism(
634 &self,
635 job_id: JobId,
636 parallelism: PbTableParallelism,
637 deferred: bool,
638 ) -> Result<()> {
639 let request = AlterParallelismRequest {
640 table_id: job_id,
641 parallelism: Some(parallelism),
642 deferred,
643 };
644
645 self.inner.alter_parallelism(request).await?;
646 Ok(())
647 }
648
649 pub async fn alter_streaming_job_config(
650 &self,
651 job_id: JobId,
652 entries_to_add: HashMap<String, String>,
653 keys_to_remove: Vec<String>,
654 ) -> Result<()> {
655 let request = AlterStreamingJobConfigRequest {
656 job_id,
657 entries_to_add,
658 keys_to_remove,
659 };
660
661 self.inner.alter_streaming_job_config(request).await?;
662 Ok(())
663 }
664
665 pub async fn alter_fragment_parallelism(
666 &self,
667 fragment_ids: Vec<FragmentId>,
668 parallelism: Option<PbTableParallelism>,
669 ) -> Result<()> {
670 let request = AlterFragmentParallelismRequest {
671 fragment_ids,
672 parallelism,
673 };
674
675 self.inner.alter_fragment_parallelism(request).await?;
676 Ok(())
677 }
678
679 pub async fn alter_cdc_table_backfill_parallelism(
680 &self,
681 table_id: JobId,
682 parallelism: PbTableParallelism,
683 ) -> Result<()> {
684 let request = AlterCdcTableBackfillParallelismRequest {
685 table_id,
686 parallelism: Some(parallelism),
687 };
688 self.inner
689 .alter_cdc_table_backfill_parallelism(request)
690 .await?;
691 Ok(())
692 }
693
694 pub async fn alter_resource_group(
695 &self,
696 table_id: TableId,
697 resource_group: Option<String>,
698 deferred: bool,
699 ) -> Result<()> {
700 let request = AlterResourceGroupRequest {
701 table_id,
702 resource_group,
703 deferred,
704 };
705
706 self.inner.alter_resource_group(request).await?;
707 Ok(())
708 }
709
710 pub async fn alter_swap_rename(
711 &self,
712 object: alter_swap_rename_request::Object,
713 ) -> Result<WaitVersion> {
714 let request = AlterSwapRenameRequest {
715 object: Some(object),
716 };
717 let resp = self.inner.alter_swap_rename(request).await?;
718 Ok(resp
719 .version
720 .ok_or_else(|| anyhow!("wait version not set"))?)
721 }
722
723 pub async fn alter_secret(
724 &self,
725 secret_id: SecretId,
726 secret_name: String,
727 database_id: DatabaseId,
728 schema_id: SchemaId,
729 owner_id: u32,
730 value: Vec<u8>,
731 ) -> Result<WaitVersion> {
732 let request = AlterSecretRequest {
733 secret_id,
734 name: secret_name,
735 database_id,
736 schema_id,
737 owner_id,
738 value,
739 };
740 let resp = self.inner.alter_secret(request).await?;
741 Ok(resp
742 .version
743 .ok_or_else(|| anyhow!("wait version not set"))?)
744 }
745
746 pub async fn replace_job(
747 &self,
748 graph: StreamFragmentGraph,
749 replace_job: ReplaceJob,
750 ) -> Result<WaitVersion> {
751 let request = ReplaceJobPlanRequest {
752 plan: Some(ReplaceJobPlan {
753 fragment_graph: Some(graph),
754 replace_job: Some(replace_job),
755 }),
756 };
757 let resp = self.inner.replace_job_plan(request).await?;
758 Ok(resp
760 .version
761 .ok_or_else(|| anyhow!("wait version not set"))?)
762 }
763
764 pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
765 let request = AutoSchemaChangeRequest {
766 schema_change: Some(schema_change),
767 };
768 let _ = self.inner.auto_schema_change(request).await?;
769 Ok(())
770 }
771
772 pub async fn create_view(
773 &self,
774 view: PbView,
775 dependencies: HashSet<ObjectId>,
776 ) -> Result<WaitVersion> {
777 let request = CreateViewRequest {
778 view: Some(view),
779 dependencies: dependencies.into_iter().collect(),
780 };
781 let resp = self.inner.create_view(request).await?;
782 Ok(resp
784 .version
785 .ok_or_else(|| anyhow!("wait version not set"))?)
786 }
787
788 pub async fn create_index(
789 &self,
790 index: PbIndex,
791 table: PbTable,
792 graph: StreamFragmentGraph,
793 if_not_exists: bool,
794 ) -> Result<WaitVersion> {
795 let request = CreateIndexRequest {
796 index: Some(index),
797 index_table: Some(table),
798 fragment_graph: Some(graph),
799 if_not_exists,
800 };
801 let resp = self.inner.create_index(request).await?;
802 Ok(resp
804 .version
805 .ok_or_else(|| anyhow!("wait version not set"))?)
806 }
807
808 pub async fn drop_table(
809 &self,
810 source_id: Option<u32>,
811 table_id: TableId,
812 cascade: bool,
813 ) -> Result<WaitVersion> {
814 let request = DropTableRequest {
815 source_id: source_id.map(risingwave_pb::ddl_service::drop_table_request::SourceId::Id),
816 table_id,
817 cascade,
818 };
819
820 let resp = self.inner.drop_table(request).await?;
821 Ok(resp
822 .version
823 .ok_or_else(|| anyhow!("wait version not set"))?)
824 }
825
826 pub async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
827 let request = CompactIcebergTableRequest { sink_id };
828 let resp = self.inner.compact_iceberg_table(request).await?;
829 Ok(resp.task_id)
830 }
831
832 pub async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
833 let request = ExpireIcebergTableSnapshotsRequest { sink_id };
834 let _resp = self.inner.expire_iceberg_table_snapshots(request).await?;
835 Ok(())
836 }
837
838 pub async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<WaitVersion> {
839 let request = DropViewRequest { view_id, cascade };
840 let resp = self.inner.drop_view(request).await?;
841 Ok(resp
842 .version
843 .ok_or_else(|| anyhow!("wait version not set"))?)
844 }
845
846 pub async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<WaitVersion> {
847 let request = DropSourceRequest { source_id, cascade };
848 let resp = self.inner.drop_source(request).await?;
849 Ok(resp
850 .version
851 .ok_or_else(|| anyhow!("wait version not set"))?)
852 }
853
854 pub async fn reset_source(&self, source_id: SourceId) -> Result<WaitVersion> {
855 let request = ResetSourceRequest { source_id };
856 let resp = self.inner.reset_source(request).await?;
857 Ok(resp
858 .version
859 .ok_or_else(|| anyhow!("wait version not set"))?)
860 }
861
862 pub async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<WaitVersion> {
863 let request = DropSinkRequest { sink_id, cascade };
864 let resp = self.inner.drop_sink(request).await?;
865 Ok(resp
866 .version
867 .ok_or_else(|| anyhow!("wait version not set"))?)
868 }
869
870 pub async fn drop_subscription(
871 &self,
872 subscription_id: SubscriptionId,
873 cascade: bool,
874 ) -> Result<WaitVersion> {
875 let request = DropSubscriptionRequest {
876 subscription_id,
877 cascade,
878 };
879 let resp = self.inner.drop_subscription(request).await?;
880 Ok(resp
881 .version
882 .ok_or_else(|| anyhow!("wait version not set"))?)
883 }
884
885 pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
886 let request = DropIndexRequest { index_id, cascade };
887 let resp = self.inner.drop_index(request).await?;
888 Ok(resp
889 .version
890 .ok_or_else(|| anyhow!("wait version not set"))?)
891 }
892
893 pub async fn drop_function(
894 &self,
895 function_id: FunctionId,
896 cascade: bool,
897 ) -> Result<WaitVersion> {
898 let request = DropFunctionRequest {
899 function_id,
900 cascade,
901 };
902 let resp = self.inner.drop_function(request).await?;
903 Ok(resp
904 .version
905 .ok_or_else(|| anyhow!("wait version not set"))?)
906 }
907
908 pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
909 let request = DropDatabaseRequest { database_id };
910 let resp = self.inner.drop_database(request).await?;
911 Ok(resp
912 .version
913 .ok_or_else(|| anyhow!("wait version not set"))?)
914 }
915
916 pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
917 let request = DropSchemaRequest { schema_id, cascade };
918 let resp = self.inner.drop_schema(request).await?;
919 Ok(resp
920 .version
921 .ok_or_else(|| anyhow!("wait version not set"))?)
922 }
923
924 pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
926 let request = CreateUserRequest { user: Some(user) };
927 let resp = self.inner.create_user(request).await?;
928 Ok(resp.version)
929 }
930
931 pub async fn drop_user(&self, user_id: u32) -> Result<u64> {
932 let request = DropUserRequest { user_id };
933 let resp = self.inner.drop_user(request).await?;
934 Ok(resp.version)
935 }
936
937 pub async fn update_user(
938 &self,
939 user: UserInfo,
940 update_fields: Vec<UpdateField>,
941 ) -> Result<u64> {
942 let request = UpdateUserRequest {
943 user: Some(user),
944 update_fields: update_fields
945 .into_iter()
946 .map(|field| field as i32)
947 .collect::<Vec<_>>(),
948 };
949 let resp = self.inner.update_user(request).await?;
950 Ok(resp.version)
951 }
952
953 pub async fn grant_privilege(
954 &self,
955 user_ids: Vec<u32>,
956 privileges: Vec<GrantPrivilege>,
957 with_grant_option: bool,
958 granted_by: u32,
959 ) -> Result<u64> {
960 let request = GrantPrivilegeRequest {
961 user_ids,
962 privileges,
963 with_grant_option,
964 granted_by,
965 };
966 let resp = self.inner.grant_privilege(request).await?;
967 Ok(resp.version)
968 }
969
970 pub async fn revoke_privilege(
971 &self,
972 user_ids: Vec<u32>,
973 privileges: Vec<GrantPrivilege>,
974 granted_by: u32,
975 revoke_by: u32,
976 revoke_grant_option: bool,
977 cascade: bool,
978 ) -> Result<u64> {
979 let request = RevokePrivilegeRequest {
980 user_ids,
981 privileges,
982 granted_by,
983 revoke_by,
984 revoke_grant_option,
985 cascade,
986 };
987 let resp = self.inner.revoke_privilege(request).await?;
988 Ok(resp.version)
989 }
990
991 pub async fn alter_default_privilege(
992 &self,
993 user_ids: Vec<u32>,
994 database_id: DatabaseId,
995 schema_ids: Vec<SchemaId>,
996 operation: AlterDefaultPrivilegeOperation,
997 granted_by: u32,
998 ) -> Result<()> {
999 let request = AlterDefaultPrivilegeRequest {
1000 user_ids,
1001 database_id,
1002 schema_ids,
1003 operation: Some(operation),
1004 granted_by,
1005 };
1006 self.inner.alter_default_privilege(request).await?;
1007 Ok(())
1008 }
1009
1010 pub async fn unregister(&self) -> Result<()> {
1012 let request = DeleteWorkerNodeRequest {
1013 host: Some(self.host_addr.to_protobuf()),
1014 };
1015 self.inner.delete_worker_node(request).await?;
1016 self.shutting_down.store(true, Relaxed);
1017 Ok(())
1018 }
1019
1020 pub async fn try_unregister(&self) {
1022 match self.unregister().await {
1023 Ok(_) => {
1024 tracing::info!(
1025 worker_id = %self.worker_id(),
1026 "successfully unregistered from meta service",
1027 )
1028 }
1029 Err(e) => {
1030 tracing::warn!(
1031 error = %e.as_report(),
1032 worker_id = %self.worker_id(),
1033 "failed to unregister from meta service",
1034 );
1035 }
1036 }
1037 }
1038
1039 pub async fn update_schedulability(
1040 &self,
1041 worker_ids: &[WorkerId],
1042 schedulability: Schedulability,
1043 ) -> Result<UpdateWorkerNodeSchedulabilityResponse> {
1044 let request = UpdateWorkerNodeSchedulabilityRequest {
1045 worker_ids: worker_ids.to_vec(),
1046 schedulability: schedulability.into(),
1047 };
1048 let resp = self
1049 .inner
1050 .update_worker_node_schedulability(request)
1051 .await?;
1052 Ok(resp)
1053 }
1054
1055 pub async fn list_worker_nodes(
1056 &self,
1057 worker_type: Option<WorkerType>,
1058 ) -> Result<Vec<WorkerNode>> {
1059 let request = ListAllNodesRequest {
1060 worker_type: worker_type.map(Into::into),
1061 include_starting_nodes: true,
1062 };
1063 let resp = self.inner.list_all_nodes(request).await?;
1064 Ok(resp.nodes)
1065 }
1066
1067 pub fn start_heartbeat_loop(
1069 meta_client: MetaClient,
1070 min_interval: Duration,
1071 ) -> (JoinHandle<()>, Sender<()>) {
1072 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1073 let join_handle = tokio::spawn(async move {
1074 let mut min_interval_ticker = tokio::time::interval(min_interval);
1075 loop {
1076 tokio::select! {
1077 biased;
1078 _ = &mut shutdown_rx => {
1080 tracing::info!("Heartbeat loop is stopped");
1081 return;
1082 }
1083 _ = min_interval_ticker.tick() => {},
1085 }
1086 tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
1087 match tokio::time::timeout(
1088 min_interval * 3,
1090 meta_client.send_heartbeat(meta_client.worker_id()),
1091 )
1092 .await
1093 {
1094 Ok(Ok(_)) => {}
1095 Ok(Err(err)) => {
1096 tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
1097 }
1098 Err(_) => {
1099 tracing::warn!("Failed to send_heartbeat: timeout");
1100 }
1101 }
1102 }
1103 });
1104 (join_handle, shutdown_tx)
1105 }
1106
1107 pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
1108 let request = RisectlListStateTablesRequest {};
1109 let resp = self.inner.risectl_list_state_tables(request).await?;
1110 Ok(resp.tables)
1111 }
1112
1113 pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
1114 let request = FlushRequest { database_id };
1115 let resp = self.inner.flush(request).await?;
1116 Ok(HummockVersionId::new(resp.hummock_version_id))
1117 }
1118
1119 pub async fn wait(&self) -> Result<()> {
1120 let request = WaitRequest {};
1121 self.inner.wait(request).await?;
1122 Ok(())
1123 }
1124
1125 pub async fn recover(&self) -> Result<()> {
1126 let request = RecoverRequest {};
1127 self.inner.recover(request).await?;
1128 Ok(())
1129 }
1130
1131 pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
1132 let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
1133 let resp = self.inner.cancel_creating_jobs(request).await?;
1134 Ok(resp.canceled_jobs)
1135 }
1136
1137 pub async fn list_table_fragments(
1138 &self,
1139 job_ids: &[JobId],
1140 ) -> Result<HashMap<JobId, TableFragmentInfo>> {
1141 let request = ListTableFragmentsRequest {
1142 table_ids: job_ids.to_vec(),
1143 };
1144 let resp = self.inner.list_table_fragments(request).await?;
1145 Ok(resp.table_fragments)
1146 }
1147
1148 pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1149 let resp = self
1150 .inner
1151 .list_streaming_job_states(ListStreamingJobStatesRequest {})
1152 .await?;
1153 Ok(resp.states)
1154 }
1155
1156 pub async fn list_fragment_distributions(&self) -> Result<Vec<FragmentDistribution>> {
1157 let resp = self
1158 .inner
1159 .list_fragment_distribution(ListFragmentDistributionRequest {})
1160 .await?;
1161 Ok(resp.distributions)
1162 }
1163
1164 pub async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
1165 let resp = self
1166 .inner
1167 .list_creating_fragment_distribution(ListCreatingFragmentDistributionRequest {})
1168 .await?;
1169 Ok(resp.distributions)
1170 }
1171
1172 pub async fn get_fragment_by_id(
1173 &self,
1174 fragment_id: FragmentId,
1175 ) -> Result<Option<FragmentDistribution>> {
1176 let resp = self
1177 .inner
1178 .get_fragment_by_id(GetFragmentByIdRequest { fragment_id })
1179 .await?;
1180 Ok(resp.distribution)
1181 }
1182
1183 pub async fn get_fragment_vnodes(
1184 &self,
1185 fragment_id: FragmentId,
1186 ) -> Result<Vec<(ActorId, Vec<u32>)>> {
1187 let resp = self
1188 .inner
1189 .get_fragment_vnodes(GetFragmentVnodesRequest { fragment_id })
1190 .await?;
1191 Ok(resp
1192 .actor_vnodes
1193 .into_iter()
1194 .map(|actor| (actor.actor_id, actor.vnode_indices))
1195 .collect())
1196 }
1197
1198 pub async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>> {
1199 let resp = self
1200 .inner
1201 .get_actor_vnodes(GetActorVnodesRequest { actor_id })
1202 .await?;
1203 Ok(resp.vnode_indices)
1204 }
1205
1206 pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1207 let resp = self
1208 .inner
1209 .list_actor_states(ListActorStatesRequest {})
1210 .await?;
1211 Ok(resp.states)
1212 }
1213
1214 pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1215 let resp = self
1216 .inner
1217 .list_actor_splits(ListActorSplitsRequest {})
1218 .await?;
1219
1220 Ok(resp.actor_splits)
1221 }
1222
1223 pub async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
1224 let resp = self
1225 .inner
1226 .list_object_dependencies(ListObjectDependenciesRequest {})
1227 .await?;
1228 Ok(resp.dependencies)
1229 }
1230
1231 pub async fn pause(&self) -> Result<PauseResponse> {
1232 let request = PauseRequest {};
1233 let resp = self.inner.pause(request).await?;
1234 Ok(resp)
1235 }
1236
1237 pub async fn resume(&self) -> Result<ResumeResponse> {
1238 let request = ResumeRequest {};
1239 let resp = self.inner.resume(request).await?;
1240 Ok(resp)
1241 }
1242
1243 pub async fn apply_throttle(
1244 &self,
1245 kind: PbThrottleTarget,
1246 id: u32,
1247 rate: Option<u32>,
1248 ) -> Result<ApplyThrottleResponse> {
1249 let request = ApplyThrottleRequest {
1250 kind: kind as i32,
1251 id,
1252 rate,
1253 };
1254 let resp = self.inner.apply_throttle(request).await?;
1255 Ok(resp)
1256 }
1257
1258 pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1259 let resp = self
1260 .inner
1261 .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1262 .await?;
1263 Ok(resp.get_status().unwrap())
1264 }
1265
1266 pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1267 let request = GetClusterInfoRequest {};
1268 let resp = self.inner.get_cluster_info(request).await?;
1269 Ok(resp)
1270 }
1271
1272 pub async fn reschedule(
1273 &self,
1274 worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1275 revision: u64,
1276 resolve_no_shuffle_upstream: bool,
1277 ) -> Result<(bool, u64)> {
1278 let request = RescheduleRequest {
1279 revision,
1280 resolve_no_shuffle_upstream,
1281 worker_reschedules,
1282 };
1283 let resp = self.inner.reschedule(request).await?;
1284 Ok((resp.success, resp.revision))
1285 }
1286
1287 pub async fn risectl_get_pinned_versions_summary(
1288 &self,
1289 ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1290 let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1291 self.inner
1292 .rise_ctl_get_pinned_versions_summary(request)
1293 .await
1294 }
1295
1296 pub async fn risectl_get_checkpoint_hummock_version(
1297 &self,
1298 ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1299 let request = RiseCtlGetCheckpointVersionRequest {};
1300 self.inner.rise_ctl_get_checkpoint_version(request).await
1301 }
1302
1303 pub async fn risectl_pause_hummock_version_checkpoint(
1304 &self,
1305 ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1306 let request = RiseCtlPauseVersionCheckpointRequest {};
1307 self.inner.rise_ctl_pause_version_checkpoint(request).await
1308 }
1309
1310 pub async fn risectl_resume_hummock_version_checkpoint(
1311 &self,
1312 ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1313 let request = RiseCtlResumeVersionCheckpointRequest {};
1314 self.inner.rise_ctl_resume_version_checkpoint(request).await
1315 }
1316
1317 pub async fn init_metadata_for_replay(
1318 &self,
1319 tables: Vec<PbTable>,
1320 compaction_groups: Vec<CompactionGroupInfo>,
1321 ) -> Result<()> {
1322 let req = InitMetadataForReplayRequest {
1323 tables,
1324 compaction_groups,
1325 };
1326 let _resp = self.inner.init_metadata_for_replay(req).await?;
1327 Ok(())
1328 }
1329
1330 pub async fn replay_version_delta(
1331 &self,
1332 version_delta: HummockVersionDelta,
1333 ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1334 let req = ReplayVersionDeltaRequest {
1335 version_delta: Some(version_delta.into()),
1336 };
1337 let resp = self.inner.replay_version_delta(req).await?;
1338 Ok((
1339 HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1340 resp.modified_compaction_groups,
1341 ))
1342 }
1343
1344 pub async fn list_version_deltas(
1345 &self,
1346 start_id: HummockVersionId,
1347 num_limit: u32,
1348 committed_epoch_limit: HummockEpoch,
1349 ) -> Result<Vec<HummockVersionDelta>> {
1350 let req = ListVersionDeltasRequest {
1351 start_id: start_id.to_u64(),
1352 num_limit,
1353 committed_epoch_limit,
1354 };
1355 Ok(self
1356 .inner
1357 .list_version_deltas(req)
1358 .await?
1359 .version_deltas
1360 .unwrap()
1361 .version_deltas
1362 .iter()
1363 .map(HummockVersionDelta::from_rpc_protobuf)
1364 .collect())
1365 }
1366
1367 pub async fn trigger_compaction_deterministic(
1368 &self,
1369 version_id: HummockVersionId,
1370 compaction_groups: Vec<CompactionGroupId>,
1371 ) -> Result<()> {
1372 let req = TriggerCompactionDeterministicRequest {
1373 version_id: version_id.to_u64(),
1374 compaction_groups,
1375 };
1376 self.inner.trigger_compaction_deterministic(req).await?;
1377 Ok(())
1378 }
1379
1380 pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1381 let req = DisableCommitEpochRequest {};
1382 Ok(HummockVersion::from_rpc_protobuf(
1383 &self
1384 .inner
1385 .disable_commit_epoch(req)
1386 .await?
1387 .current_version
1388 .unwrap(),
1389 ))
1390 }
1391
1392 pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1393 let req = GetAssignedCompactTaskNumRequest {};
1394 let resp = self.inner.get_assigned_compact_task_num(req).await?;
1395 Ok(resp.num_tasks as usize)
1396 }
1397
1398 pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1399 let req = RiseCtlListCompactionGroupRequest {};
1400 let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1401 Ok(resp.compaction_groups)
1402 }
1403
1404 pub async fn risectl_update_compaction_config(
1405 &self,
1406 compaction_groups: &[CompactionGroupId],
1407 configs: &[MutableConfig],
1408 ) -> Result<()> {
1409 let req = RiseCtlUpdateCompactionConfigRequest {
1410 compaction_group_ids: compaction_groups.to_vec(),
1411 configs: configs
1412 .iter()
1413 .map(
1414 |c| rise_ctl_update_compaction_config_request::MutableConfig {
1415 mutable_config: Some(c.clone()),
1416 },
1417 )
1418 .collect(),
1419 };
1420 let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1421 Ok(())
1422 }
1423
1424 pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1425 let req = BackupMetaRequest { remarks };
1426 let resp = self.inner.backup_meta(req).await?;
1427 Ok(resp.job_id)
1428 }
1429
1430 pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1431 let req = GetBackupJobStatusRequest { job_id };
1432 let resp = self.inner.get_backup_job_status(req).await?;
1433 Ok((resp.job_status(), resp.message))
1434 }
1435
1436 pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1437 let req = DeleteMetaSnapshotRequest {
1438 snapshot_ids: snapshot_ids.to_vec(),
1439 };
1440 let _resp = self.inner.delete_meta_snapshot(req).await?;
1441 Ok(())
1442 }
1443
1444 pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1445 let req = GetMetaSnapshotManifestRequest {};
1446 let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1447 Ok(resp.manifest.expect("should exist"))
1448 }
1449
1450 pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1451 let req = GetTelemetryInfoRequest {};
1452 let resp = self.inner.get_telemetry_info(req).await?;
1453 Ok(resp)
1454 }
1455
1456 pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1457 let req = GetMetaStoreInfoRequest {};
1458 let resp = self.inner.get_meta_store_info(req).await?;
1459 Ok(resp.meta_store_endpoint)
1460 }
1461
1462 pub async fn alter_sink_props(
1463 &self,
1464 sink_id: SinkId,
1465 changed_props: BTreeMap<String, String>,
1466 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1467 connector_conn_ref: Option<ConnectionId>,
1468 ) -> Result<()> {
1469 let req = AlterConnectorPropsRequest {
1470 object_id: sink_id.as_raw_id(),
1471 changed_props: changed_props.into_iter().collect(),
1472 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1473 connector_conn_ref,
1474 object_type: AlterConnectorPropsObject::Sink as i32,
1475 extra_options: None,
1476 };
1477 let _resp = self.inner.alter_connector_props(req).await?;
1478 Ok(())
1479 }
1480
1481 pub async fn alter_iceberg_table_props(
1482 &self,
1483 table_id: TableId,
1484 sink_id: SinkId,
1485 source_id: SourceId,
1486 changed_props: BTreeMap<String, String>,
1487 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1488 connector_conn_ref: Option<ConnectionId>,
1489 ) -> Result<()> {
1490 let req = AlterConnectorPropsRequest {
1491 object_id: table_id.as_raw_id(),
1492 changed_props: changed_props.into_iter().collect(),
1493 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1494 connector_conn_ref,
1495 object_type: AlterConnectorPropsObject::IcebergTable as i32,
1496 extra_options: Some(ExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds {
1497 sink_id,
1498 source_id,
1499 })),
1500 };
1501 let _resp = self.inner.alter_connector_props(req).await?;
1502 Ok(())
1503 }
1504
1505 pub async fn alter_source_connector_props(
1506 &self,
1507 source_id: SourceId,
1508 changed_props: BTreeMap<String, String>,
1509 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1510 connector_conn_ref: Option<ConnectionId>,
1511 ) -> Result<()> {
1512 let req = AlterConnectorPropsRequest {
1513 object_id: source_id.as_raw_id(),
1514 changed_props: changed_props.into_iter().collect(),
1515 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1516 connector_conn_ref,
1517 object_type: AlterConnectorPropsObject::Source as i32,
1518 extra_options: None,
1519 };
1520 let _resp = self.inner.alter_connector_props(req).await?;
1521 Ok(())
1522 }
1523
1524 pub async fn alter_connection_connector_props(
1525 &self,
1526 connection_id: u32,
1527 changed_props: BTreeMap<String, String>,
1528 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1529 ) -> Result<()> {
1530 let req = AlterConnectorPropsRequest {
1531 object_id: connection_id,
1532 changed_props: changed_props.into_iter().collect(),
1533 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1534 connector_conn_ref: None, object_type: AlterConnectorPropsObject::Connection as i32,
1536 extra_options: None,
1537 };
1538 let _resp = self.inner.alter_connector_props(req).await?;
1539 Ok(())
1540 }
1541
1542 pub async fn set_system_param(
1543 &self,
1544 param: String,
1545 value: Option<String>,
1546 ) -> Result<Option<SystemParamsReader>> {
1547 let req = SetSystemParamRequest { param, value };
1548 let resp = self.inner.set_system_param(req).await?;
1549 Ok(resp.params.map(SystemParamsReader::from))
1550 }
1551
1552 pub async fn get_session_params(&self) -> Result<String> {
1553 let req = GetSessionParamsRequest {};
1554 let resp = self.inner.get_session_params(req).await?;
1555 Ok(resp.params)
1556 }
1557
1558 pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1559 let req = SetSessionParamRequest { param, value };
1560 let resp = self.inner.set_session_param(req).await?;
1561 Ok(resp.param)
1562 }
1563
1564 pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1565 let req = GetDdlProgressRequest {};
1566 let resp = self.inner.get_ddl_progress(req).await?;
1567 Ok(resp.ddl_progress)
1568 }
1569
1570 pub async fn split_compaction_group(
1571 &self,
1572 group_id: CompactionGroupId,
1573 table_ids_to_new_group: &[TableId],
1574 partition_vnode_count: u32,
1575 ) -> Result<CompactionGroupId> {
1576 let req = SplitCompactionGroupRequest {
1577 group_id,
1578 table_ids: table_ids_to_new_group.to_vec(),
1579 partition_vnode_count,
1580 };
1581 let resp = self.inner.split_compaction_group(req).await?;
1582 Ok(resp.new_group_id)
1583 }
1584
1585 pub async fn get_tables(
1586 &self,
1587 table_ids: Vec<TableId>,
1588 include_dropped_tables: bool,
1589 ) -> Result<HashMap<TableId, Table>> {
1590 let req = GetTablesRequest {
1591 table_ids,
1592 include_dropped_tables,
1593 };
1594 let resp = self.inner.get_tables(req).await?;
1595 Ok(resp.tables)
1596 }
1597
1598 pub async fn list_serving_vnode_mappings(
1599 &self,
1600 ) -> Result<HashMap<FragmentId, (u32, WorkerSlotMapping)>> {
1601 let req = GetServingVnodeMappingsRequest {};
1602 let resp = self.inner.get_serving_vnode_mappings(req).await?;
1603 let mappings = resp
1604 .worker_slot_mappings
1605 .into_iter()
1606 .map(|p| {
1607 (
1608 p.fragment_id,
1609 (
1610 resp.fragment_to_table
1611 .get(&p.fragment_id)
1612 .cloned()
1613 .unwrap_or(0),
1614 WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1615 ),
1616 )
1617 })
1618 .collect();
1619 Ok(mappings)
1620 }
1621
1622 pub async fn risectl_list_compaction_status(
1623 &self,
1624 ) -> Result<(
1625 Vec<CompactStatus>,
1626 Vec<CompactTaskAssignment>,
1627 Vec<CompactTaskProgress>,
1628 )> {
1629 let req = RiseCtlListCompactionStatusRequest {};
1630 let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1631 Ok((
1632 resp.compaction_statuses,
1633 resp.task_assignment,
1634 resp.task_progress,
1635 ))
1636 }
1637
1638 pub async fn get_compaction_score(
1639 &self,
1640 compaction_group_id: CompactionGroupId,
1641 ) -> Result<Vec<PickerInfo>> {
1642 let req = GetCompactionScoreRequest {
1643 compaction_group_id,
1644 };
1645 let resp = self.inner.get_compaction_score(req).await?;
1646 Ok(resp.scores)
1647 }
1648
1649 pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1650 let req = RiseCtlRebuildTableStatsRequest {};
1651 let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1652 Ok(())
1653 }
1654
1655 pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1656 let req = ListBranchedObjectRequest {};
1657 let resp = self.inner.list_branched_object(req).await?;
1658 Ok(resp.branched_objects)
1659 }
1660
1661 pub async fn list_active_write_limit(&self) -> Result<HashMap<u64, WriteLimit>> {
1662 let req = ListActiveWriteLimitRequest {};
1663 let resp = self.inner.list_active_write_limit(req).await?;
1664 Ok(resp.write_limits)
1665 }
1666
1667 pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1668 let req = ListHummockMetaConfigRequest {};
1669 let resp = self.inner.list_hummock_meta_config(req).await?;
1670 Ok(resp.configs)
1671 }
1672
1673 pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1674 let _resp = self
1675 .inner
1676 .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1677 .await?;
1678
1679 Ok(())
1680 }
1681
1682 pub async fn rw_cloud_validate_source(
1683 &self,
1684 source_type: SourceType,
1685 source_config: HashMap<String, String>,
1686 ) -> Result<RwCloudValidateSourceResponse> {
1687 let req = RwCloudValidateSourceRequest {
1688 source_type: source_type.into(),
1689 source_config,
1690 };
1691 let resp = self.inner.rw_cloud_validate_source(req).await?;
1692 Ok(resp)
1693 }
1694
1695 pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1696 self.inner.core.read().await.sink_coordinate_client.clone()
1697 }
1698
1699 pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1700 let req = ListCompactTaskAssignmentRequest {};
1701 let resp = self.inner.list_compact_task_assignment(req).await?;
1702 Ok(resp.task_assignment)
1703 }
1704
1705 pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1706 let req = ListEventLogRequest::default();
1707 let resp = self.inner.list_event_log(req).await?;
1708 Ok(resp.event_logs)
1709 }
1710
1711 pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1712 let req = ListCompactTaskProgressRequest {};
1713 let resp = self.inner.list_compact_task_progress(req).await?;
1714 Ok(resp.task_progress)
1715 }
1716
1717 #[cfg(madsim)]
1718 pub fn try_add_panic_event_blocking(
1719 &self,
1720 panic_info: impl Display,
1721 timeout_millis: Option<u64>,
1722 ) {
1723 }
1724
1725 #[cfg(not(madsim))]
1727 pub fn try_add_panic_event_blocking(
1728 &self,
1729 panic_info: impl Display,
1730 timeout_millis: Option<u64>,
1731 ) {
1732 let event = event_log::EventWorkerNodePanic {
1733 worker_id: self.worker_id,
1734 worker_type: self.worker_type.into(),
1735 host_addr: Some(self.host_addr.to_protobuf()),
1736 panic_info: format!("{panic_info}"),
1737 };
1738 let grpc_meta_client = self.inner.clone();
1739 let _ = thread::spawn(move || {
1740 let rt = tokio::runtime::Builder::new_current_thread()
1741 .enable_all()
1742 .build()
1743 .unwrap();
1744 let req = AddEventLogRequest {
1745 event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1746 };
1747 rt.block_on(async {
1748 let _ = tokio::time::timeout(
1749 Duration::from_millis(timeout_millis.unwrap_or(1000)),
1750 grpc_meta_client.add_event_log(req),
1751 )
1752 .await;
1753 });
1754 })
1755 .join();
1756 }
1757
1758 pub async fn add_sink_fail_evet(
1759 &self,
1760 sink_id: SinkId,
1761 sink_name: String,
1762 connector: String,
1763 error: String,
1764 ) -> Result<()> {
1765 let event = event_log::EventSinkFail {
1766 sink_id,
1767 sink_name,
1768 connector,
1769 error,
1770 };
1771 let req = AddEventLogRequest {
1772 event: Some(add_event_log_request::Event::SinkFail(event)),
1773 };
1774 self.inner.add_event_log(req).await?;
1775 Ok(())
1776 }
1777
1778 pub async fn add_cdc_auto_schema_change_fail_event(
1779 &self,
1780 source_id: SourceId,
1781 table_name: String,
1782 cdc_table_id: String,
1783 upstream_ddl: String,
1784 fail_info: String,
1785 ) -> Result<()> {
1786 let event = event_log::EventAutoSchemaChangeFail {
1787 table_id: source_id.as_cdc_table_id(),
1788 table_name,
1789 cdc_table_id,
1790 upstream_ddl,
1791 fail_info,
1792 };
1793 let req = AddEventLogRequest {
1794 event: Some(add_event_log_request::Event::AutoSchemaChangeFail(event)),
1795 };
1796 self.inner.add_event_log(req).await?;
1797 Ok(())
1798 }
1799
1800 pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1801 let req = CancelCompactTaskRequest {
1802 task_id,
1803 task_status: task_status as _,
1804 };
1805 let resp = self.inner.cancel_compact_task(req).await?;
1806 Ok(resp.ret)
1807 }
1808
1809 pub async fn get_version_by_epoch(
1810 &self,
1811 epoch: HummockEpoch,
1812 table_id: TableId,
1813 ) -> Result<PbHummockVersion> {
1814 let req = GetVersionByEpochRequest { epoch, table_id };
1815 let resp = self.inner.get_version_by_epoch(req).await?;
1816 Ok(resp.version.unwrap())
1817 }
1818
1819 pub async fn get_cluster_limits(
1820 &self,
1821 ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1822 let req = GetClusterLimitsRequest {};
1823 let resp = self.inner.get_cluster_limits(req).await?;
1824 Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1825 }
1826
1827 pub async fn merge_compaction_group(
1828 &self,
1829 left_group_id: CompactionGroupId,
1830 right_group_id: CompactionGroupId,
1831 ) -> Result<()> {
1832 let req = MergeCompactionGroupRequest {
1833 left_group_id,
1834 right_group_id,
1835 };
1836 self.inner.merge_compaction_group(req).await?;
1837 Ok(())
1838 }
1839
1840 pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1842 let request = ListRateLimitsRequest {};
1843 let resp = self.inner.list_rate_limits(request).await?;
1844 Ok(resp.rate_limits)
1845 }
1846
1847 pub async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
1848 let request = ListCdcProgressRequest {};
1849 let resp = self.inner.list_cdc_progress(request).await?;
1850 Ok(resp.cdc_progress)
1851 }
1852
1853 pub async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>> {
1854 let request = ListRefreshTableStatesRequest {};
1855 let resp = self.inner.list_refresh_table_states(request).await?;
1856 Ok(resp.states)
1857 }
1858
1859 pub async fn create_iceberg_table(
1860 &self,
1861 table_job_info: PbTableJobInfo,
1862 sink_job_info: PbSinkJobInfo,
1863 iceberg_source: PbSource,
1864 if_not_exists: bool,
1865 ) -> Result<WaitVersion> {
1866 let request = CreateIcebergTableRequest {
1867 table_info: Some(table_job_info),
1868 sink_info: Some(sink_job_info),
1869 iceberg_source: Some(iceberg_source),
1870 if_not_exists,
1871 };
1872
1873 let resp = self.inner.create_iceberg_table(request).await?;
1874 Ok(resp
1875 .version
1876 .ok_or_else(|| anyhow!("wait version not set"))?)
1877 }
1878
1879 pub async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
1880 let request = ListIcebergTablesRequest {};
1881 let resp = self.inner.list_iceberg_tables(request).await?;
1882 Ok(resp.iceberg_tables)
1883 }
1884
1885 pub async fn get_cluster_stack_trace(
1887 &self,
1888 actor_traces_format: ActorTracesFormat,
1889 ) -> Result<StackTraceResponse> {
1890 let request = StackTraceRequest {
1891 actor_traces_format: actor_traces_format as i32,
1892 };
1893 let resp = self.inner.stack_trace(request).await?;
1894 Ok(resp)
1895 }
1896
1897 pub async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
1898 let request = SetSyncLogStoreAlignedRequest { job_id, aligned };
1899 self.inner.set_sync_log_store_aligned(request).await?;
1900 Ok(())
1901 }
1902
1903 pub async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
1904 self.inner.refresh(request).await
1905 }
1906
1907 pub async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
1908 let request = ListUnmigratedTablesRequest {};
1909 let resp = self.inner.list_unmigrated_tables(request).await?;
1910 Ok(resp
1911 .tables
1912 .into_iter()
1913 .map(|table| (table.table_id, table.table_name))
1914 .collect())
1915 }
1916}
1917
1918#[async_trait]
1919impl HummockMetaClient for MetaClient {
1920 async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
1921 let req = UnpinVersionBeforeRequest {
1922 context_id: self.worker_id(),
1923 unpin_version_before: unpin_version_before.to_u64(),
1924 };
1925 self.inner.unpin_version_before(req).await?;
1926 Ok(())
1927 }
1928
1929 async fn get_current_version(&self) -> Result<HummockVersion> {
1930 let req = GetCurrentVersionRequest::default();
1931 Ok(HummockVersion::from_rpc_protobuf(
1932 &self
1933 .inner
1934 .get_current_version(req)
1935 .await?
1936 .current_version
1937 .unwrap(),
1938 ))
1939 }
1940
1941 async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
1942 let resp = self
1943 .inner
1944 .get_new_object_ids(GetNewObjectIdsRequest { number })
1945 .await?;
1946 Ok(ObjectIdRange::new(resp.start_id, resp.end_id))
1947 }
1948
1949 async fn commit_epoch_with_change_log(
1950 &self,
1951 _epoch: HummockEpoch,
1952 _sync_result: SyncResult,
1953 _change_log_info: Option<HummockMetaClientChangeLogInfo>,
1954 ) -> Result<()> {
1955 panic!("Only meta service can commit_epoch in production.")
1956 }
1957
1958 async fn trigger_manual_compaction(
1959 &self,
1960 compaction_group_id: u64,
1961 table_id: JobId,
1962 level: u32,
1963 sst_ids: Vec<u64>,
1964 ) -> Result<()> {
1965 let req = TriggerManualCompactionRequest {
1967 compaction_group_id,
1968 table_id,
1969 level,
1972 sst_ids,
1973 ..Default::default()
1974 };
1975
1976 self.inner.trigger_manual_compaction(req).await?;
1977 Ok(())
1978 }
1979
1980 async fn trigger_full_gc(
1981 &self,
1982 sst_retention_time_sec: u64,
1983 prefix: Option<String>,
1984 ) -> Result<()> {
1985 self.inner
1986 .trigger_full_gc(TriggerFullGcRequest {
1987 sst_retention_time_sec,
1988 prefix,
1989 })
1990 .await?;
1991 Ok(())
1992 }
1993
1994 async fn subscribe_compaction_event(
1995 &self,
1996 ) -> Result<(
1997 UnboundedSender<SubscribeCompactionEventRequest>,
1998 BoxStream<'static, CompactionEventItem>,
1999 )> {
2000 let (request_sender, request_receiver) =
2001 unbounded_channel::<SubscribeCompactionEventRequest>();
2002 request_sender
2003 .send(SubscribeCompactionEventRequest {
2004 event: Some(subscribe_compaction_event_request::Event::Register(
2005 Register {
2006 context_id: self.worker_id(),
2007 },
2008 )),
2009 create_at: SystemTime::now()
2010 .duration_since(SystemTime::UNIX_EPOCH)
2011 .expect("Clock may have gone backwards")
2012 .as_millis() as u64,
2013 })
2014 .context("Failed to subscribe compaction event")?;
2015
2016 let stream = self
2017 .inner
2018 .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
2019 request_receiver,
2020 )))
2021 .await?;
2022
2023 Ok((request_sender, Box::pin(stream)))
2024 }
2025
2026 async fn get_version_by_epoch(
2027 &self,
2028 epoch: HummockEpoch,
2029 table_id: TableId,
2030 ) -> Result<PbHummockVersion> {
2031 self.get_version_by_epoch(epoch, table_id).await
2032 }
2033
2034 async fn subscribe_iceberg_compaction_event(
2035 &self,
2036 ) -> Result<(
2037 UnboundedSender<SubscribeIcebergCompactionEventRequest>,
2038 BoxStream<'static, IcebergCompactionEventItem>,
2039 )> {
2040 let (request_sender, request_receiver) =
2041 unbounded_channel::<SubscribeIcebergCompactionEventRequest>();
2042 request_sender
2043 .send(SubscribeIcebergCompactionEventRequest {
2044 event: Some(subscribe_iceberg_compaction_event_request::Event::Register(
2045 IcebergRegister {
2046 context_id: self.worker_id(),
2047 },
2048 )),
2049 create_at: SystemTime::now()
2050 .duration_since(SystemTime::UNIX_EPOCH)
2051 .expect("Clock may have gone backwards")
2052 .as_millis() as u64,
2053 })
2054 .context("Failed to subscribe compaction event")?;
2055
2056 let stream = self
2057 .inner
2058 .subscribe_iceberg_compaction_event(Request::new(UnboundedReceiverStream::new(
2059 request_receiver,
2060 )))
2061 .await?;
2062
2063 Ok((request_sender, Box::pin(stream)))
2064 }
2065}
2066
2067#[async_trait]
2068impl TelemetryInfoFetcher for MetaClient {
2069 async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
2070 let resp = self
2071 .get_telemetry_info()
2072 .await
2073 .map_err(|e| e.to_report_string())?;
2074 let tracking_id = resp.get_tracking_id().ok();
2075 Ok(tracking_id.map(|id| id.to_owned()))
2076 }
2077}
2078
2079pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
2080
2081#[derive(Debug, Clone)]
2082struct GrpcMetaClientCore {
2083 cluster_client: ClusterServiceClient<Channel>,
2084 meta_member_client: MetaMemberServiceClient<Channel>,
2085 heartbeat_client: HeartbeatServiceClient<Channel>,
2086 ddl_client: DdlServiceClient<Channel>,
2087 hummock_client: HummockManagerServiceClient<Channel>,
2088 notification_client: NotificationServiceClient<Channel>,
2089 stream_client: StreamManagerServiceClient<Channel>,
2090 user_client: UserServiceClient<Channel>,
2091 scale_client: ScaleServiceClient<Channel>,
2092 backup_client: BackupServiceClient<Channel>,
2093 telemetry_client: TelemetryInfoServiceClient<Channel>,
2094 system_params_client: SystemParamsServiceClient<Channel>,
2095 session_params_client: SessionParamServiceClient<Channel>,
2096 serving_client: ServingServiceClient<Channel>,
2097 cloud_client: CloudServiceClient<Channel>,
2098 sink_coordinate_client: SinkCoordinationRpcClient,
2099 event_log_client: EventLogServiceClient<Channel>,
2100 cluster_limit_client: ClusterLimitServiceClient<Channel>,
2101 hosted_iceberg_catalog_service_client: HostedIcebergCatalogServiceClient<Channel>,
2102 monitor_client: MonitorServiceClient<Channel>,
2103}
2104
2105impl GrpcMetaClientCore {
2106 pub(crate) fn new(channel: Channel) -> Self {
2107 let cluster_client = ClusterServiceClient::new(channel.clone());
2108 let meta_member_client = MetaMemberClient::new(channel.clone());
2109 let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
2110 let ddl_client =
2111 DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2112 let hummock_client =
2113 HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2114 let notification_client =
2115 NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2116 let stream_client =
2117 StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2118 let user_client = UserServiceClient::new(channel.clone());
2119 let scale_client =
2120 ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2121 let backup_client = BackupServiceClient::new(channel.clone());
2122 let telemetry_client =
2123 TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2124 let system_params_client = SystemParamsServiceClient::new(channel.clone());
2125 let session_params_client = SessionParamServiceClient::new(channel.clone());
2126 let serving_client = ServingServiceClient::new(channel.clone());
2127 let cloud_client = CloudServiceClient::new(channel.clone());
2128 let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone())
2129 .max_decoding_message_size(usize::MAX);
2130 let event_log_client = EventLogServiceClient::new(channel.clone());
2131 let cluster_limit_client = ClusterLimitServiceClient::new(channel.clone());
2132 let hosted_iceberg_catalog_service_client =
2133 HostedIcebergCatalogServiceClient::new(channel.clone());
2134 let monitor_client = MonitorServiceClient::new(channel);
2135
2136 GrpcMetaClientCore {
2137 cluster_client,
2138 meta_member_client,
2139 heartbeat_client,
2140 ddl_client,
2141 hummock_client,
2142 notification_client,
2143 stream_client,
2144 user_client,
2145 scale_client,
2146 backup_client,
2147 telemetry_client,
2148 system_params_client,
2149 session_params_client,
2150 serving_client,
2151 cloud_client,
2152 sink_coordinate_client,
2153 event_log_client,
2154 cluster_limit_client,
2155 hosted_iceberg_catalog_service_client,
2156 monitor_client,
2157 }
2158 }
2159}
2160
2161#[derive(Debug, Clone)]
2165struct GrpcMetaClient {
2166 member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
2167 core: Arc<RwLock<GrpcMetaClientCore>>,
2168}
2169
2170type MetaMemberClient = MetaMemberServiceClient<Channel>;
2171
2172struct MetaMemberGroup {
2173 members: LruCache<http::Uri, Option<MetaMemberClient>>,
2174}
2175
2176struct MetaMemberManagement {
2177 core_ref: Arc<RwLock<GrpcMetaClientCore>>,
2178 members: Either<MetaMemberClient, MetaMemberGroup>,
2179 current_leader: http::Uri,
2180 meta_config: Arc<MetaConfig>,
2181}
2182
2183impl MetaMemberManagement {
2184 const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
2185
2186 fn host_address_to_uri(addr: HostAddress) -> http::Uri {
2187 format!("http://{}:{}", addr.host, addr.port)
2188 .parse()
2189 .unwrap()
2190 }
2191
2192 async fn recreate_core(&self, channel: Channel) {
2193 let mut core = self.core_ref.write().await;
2194 *core = GrpcMetaClientCore::new(channel);
2195 }
2196
2197 async fn refresh_members(&mut self) -> Result<()> {
2198 let leader_addr = match self.members.as_mut() {
2199 Either::Left(client) => {
2200 let resp = client
2201 .to_owned()
2202 .members(MembersRequest {})
2203 .await
2204 .map_err(RpcError::from_meta_status)?;
2205 let resp = resp.into_inner();
2206 resp.members.into_iter().find(|member| member.is_leader)
2207 }
2208 Either::Right(member_group) => {
2209 let mut fetched_members = None;
2210
2211 for (addr, client) in &mut member_group.members {
2212 let members: Result<_> = try {
2213 let mut client = match client {
2214 Some(cached_client) => cached_client.to_owned(),
2215 None => {
2216 let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
2217 let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
2218 .await
2219 .context("failed to create client")?;
2220 let new_client: MetaMemberClient =
2221 MetaMemberServiceClient::new(channel);
2222 *client = Some(new_client.clone());
2223
2224 new_client
2225 }
2226 };
2227
2228 let resp = client
2229 .members(MembersRequest {})
2230 .await
2231 .context("failed to fetch members")?;
2232
2233 resp.into_inner().members
2234 };
2235
2236 let fetched = members.is_ok();
2237 fetched_members = Some(members);
2238 if fetched {
2239 break;
2240 }
2241 }
2242
2243 let members = fetched_members
2244 .context("no member available in the list")?
2245 .context("could not refresh members")?;
2246
2247 let mut leader = None;
2249 for member in members {
2250 if member.is_leader {
2251 leader = Some(member.clone());
2252 }
2253
2254 let addr = Self::host_address_to_uri(member.address.unwrap());
2255 if !member_group.members.contains(&addr) {
2257 tracing::info!("new meta member joined: {}", addr);
2258 member_group.members.put(addr, None);
2259 }
2260 }
2261
2262 leader
2263 }
2264 };
2265
2266 if let Some(leader) = leader_addr {
2267 let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
2268
2269 if discovered_leader != self.current_leader {
2270 tracing::info!("new meta leader {} discovered", discovered_leader);
2271
2272 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
2273 Duration::from_secs(self.meta_config.meta_leader_lease_secs),
2274 false,
2275 );
2276
2277 let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
2278 let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
2279 GrpcMetaClient::connect_to_endpoint(endpoint).await
2280 })
2281 .await?;
2282
2283 self.recreate_core(channel).await;
2284 self.current_leader = discovered_leader;
2285 }
2286 }
2287
2288 Ok(())
2289 }
2290}
2291
2292impl GrpcMetaClient {
2293 const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
2295 const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
2297 const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
2299 const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
2301
2302 fn start_meta_member_monitor(
2303 &self,
2304 init_leader_addr: http::Uri,
2305 members: Either<MetaMemberClient, MetaMemberGroup>,
2306 force_refresh_receiver: Receiver<Sender<Result<()>>>,
2307 meta_config: Arc<MetaConfig>,
2308 ) -> Result<()> {
2309 let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
2310 let current_leader = init_leader_addr;
2311
2312 let enable_period_tick = matches!(members, Either::Right(_));
2313
2314 let member_management = MetaMemberManagement {
2315 core_ref,
2316 members,
2317 current_leader,
2318 meta_config,
2319 };
2320
2321 let mut force_refresh_receiver = force_refresh_receiver;
2322
2323 tokio::spawn(async move {
2324 let mut member_management = member_management;
2325 let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
2326
2327 loop {
2328 let event: Option<Sender<Result<()>>> = if enable_period_tick {
2329 tokio::select! {
2330 _ = ticker.tick() => None,
2331 result_sender = force_refresh_receiver.recv() => {
2332 if result_sender.is_none() {
2333 break;
2334 }
2335
2336 result_sender
2337 },
2338 }
2339 } else {
2340 let result_sender = force_refresh_receiver.recv().await;
2341
2342 if result_sender.is_none() {
2343 break;
2344 }
2345
2346 result_sender
2347 };
2348
2349 let tick_result = member_management.refresh_members().await;
2350 if let Err(e) = tick_result.as_ref() {
2351 tracing::warn!(error = %e.as_report(), "refresh meta member client failed");
2352 }
2353
2354 if let Some(sender) = event {
2355 let _resp = sender.send(tick_result);
2357 }
2358 }
2359 });
2360
2361 Ok(())
2362 }
2363
2364 async fn force_refresh_leader(&self) -> Result<()> {
2365 let (sender, receiver) = oneshot::channel();
2366
2367 self.member_monitor_event_sender
2368 .send(sender)
2369 .await
2370 .map_err(|e| anyhow!(e))?;
2371
2372 receiver.await.map_err(|e| anyhow!(e))?
2373 }
2374
2375 pub async fn new(strategy: &MetaAddressStrategy, config: Arc<MetaConfig>) -> Result<Self> {
2377 let (channel, addr) = match strategy {
2378 MetaAddressStrategy::LoadBalance(addr) => {
2379 Self::try_build_rpc_channel(vec![addr.clone()]).await
2380 }
2381 MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
2382 }?;
2383 let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
2384 let client = GrpcMetaClient {
2385 member_monitor_event_sender: force_refresh_sender,
2386 core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
2387 };
2388
2389 let meta_member_client = client.core.read().await.meta_member_client.clone();
2390 let members = match strategy {
2391 MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
2392 MetaAddressStrategy::List(addrs) => {
2393 let mut members = LruCache::new(20.try_into().unwrap());
2394 for addr in addrs {
2395 members.put(addr.clone(), None);
2396 }
2397 members.put(addr.clone(), Some(meta_member_client));
2398
2399 Either::Right(MetaMemberGroup { members })
2400 }
2401 };
2402
2403 client.start_meta_member_monitor(addr, members, force_refresh_receiver, config.clone())?;
2404
2405 client.force_refresh_leader().await?;
2406
2407 Ok(client)
2408 }
2409
2410 fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2411 Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2412 }
2413
2414 pub(crate) async fn try_build_rpc_channel(
2415 addrs: impl IntoIterator<Item = http::Uri>,
2416 ) -> Result<(Channel, http::Uri)> {
2417 let endpoints: Vec<_> = addrs
2418 .into_iter()
2419 .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2420 .collect();
2421
2422 let mut last_error = None;
2423
2424 for (endpoint, addr) in endpoints {
2425 match Self::connect_to_endpoint(endpoint).await {
2426 Ok(channel) => {
2427 tracing::info!("Connect to meta server {} successfully", addr);
2428 return Ok((channel, addr));
2429 }
2430 Err(e) => {
2431 tracing::warn!(
2432 error = %e.as_report(),
2433 "Failed to connect to meta server {}, trying again",
2434 addr,
2435 );
2436 last_error = Some(e);
2437 }
2438 }
2439 }
2440
2441 if let Some(last_error) = last_error {
2442 Err(anyhow::anyhow!(last_error)
2443 .context("failed to connect to all meta servers")
2444 .into())
2445 } else {
2446 bail!("no meta server address provided")
2447 }
2448 }
2449
2450 async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2451 let channel = endpoint
2452 .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2453 .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2454 .connect_timeout(Duration::from_secs(5))
2455 .monitored_connect("grpc-meta-client", Default::default())
2456 .await?
2457 .wrapped();
2458
2459 Ok(channel)
2460 }
2461
2462 pub(crate) fn retry_strategy_to_bound(
2463 high_bound: Duration,
2464 exceed: bool,
2465 ) -> impl Iterator<Item = Duration> {
2466 let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2467 .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2468 .map(jitter);
2469
2470 let mut sum = Duration::default();
2471
2472 iter.take_while(move |duration| {
2473 sum += *duration;
2474
2475 if exceed {
2476 sum < high_bound + *duration
2477 } else {
2478 sum < high_bound
2479 }
2480 })
2481 }
2482}
2483
2484macro_rules! for_all_meta_rpc {
2485 ($macro:ident) => {
2486 $macro! {
2487 { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2488 ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2489 ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2490 ,{ cluster_client, update_worker_node_schedulability, UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse }
2491 ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2492 ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2493 ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2494 ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2495 ,{ stream_client, flush, FlushRequest, FlushResponse }
2496 ,{ stream_client, pause, PauseRequest, PauseResponse }
2497 ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2498 ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2499 ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2500 ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2501 ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2502 ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2503 ,{ stream_client, list_creating_fragment_distribution, ListCreatingFragmentDistributionRequest, ListCreatingFragmentDistributionResponse }
2504 ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2505 ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2506 ,{ stream_client, list_object_dependencies, ListObjectDependenciesRequest, ListObjectDependenciesResponse }
2507 ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2508 ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2509 ,{ stream_client, list_cdc_progress, ListCdcProgressRequest, ListCdcProgressResponse }
2510 ,{ stream_client, list_refresh_table_states, ListRefreshTableStatesRequest, ListRefreshTableStatesResponse }
2511 ,{ stream_client, alter_connector_props, AlterConnectorPropsRequest, AlterConnectorPropsResponse }
2512 ,{ stream_client, get_fragment_by_id, GetFragmentByIdRequest, GetFragmentByIdResponse }
2513 ,{ stream_client, get_fragment_vnodes, GetFragmentVnodesRequest, GetFragmentVnodesResponse }
2514 ,{ stream_client, get_actor_vnodes, GetActorVnodesRequest, GetActorVnodesResponse }
2515 ,{ stream_client, set_sync_log_store_aligned, SetSyncLogStoreAlignedRequest, SetSyncLogStoreAlignedResponse }
2516 ,{ stream_client, refresh, RefreshRequest, RefreshResponse }
2517 ,{ stream_client, list_unmigrated_tables, ListUnmigratedTablesRequest, ListUnmigratedTablesResponse }
2518 ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2519 ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2520 ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2521 ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2522 ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2523 ,{ ddl_client, alter_streaming_job_config, AlterStreamingJobConfigRequest, AlterStreamingJobConfigResponse }
2524 ,{ ddl_client, alter_fragment_parallelism, AlterFragmentParallelismRequest, AlterFragmentParallelismResponse }
2525 ,{ ddl_client, alter_cdc_table_backfill_parallelism, AlterCdcTableBackfillParallelismRequest, AlterCdcTableBackfillParallelismResponse }
2526 ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2527 ,{ ddl_client, alter_database_param, AlterDatabaseParamRequest, AlterDatabaseParamResponse }
2528 ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2529 ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2530 ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2531 ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2532 ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2533 ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2534 ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2535 ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2536 ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2537 ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2538 ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2539 ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2540 ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2541 ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2542 ,{ ddl_client, reset_source, ResetSourceRequest, ResetSourceResponse }
2543 ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2544 ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2545 ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2546 ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2547 ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2548 ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2549 ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2550 ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2551 ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2552 ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2553 ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2554 ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2555 ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2556 ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2557 ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2558 ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2559 ,{ ddl_client, wait, WaitRequest, WaitResponse }
2560 ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2561 ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2562 ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2563 ,{ ddl_client, compact_iceberg_table, CompactIcebergTableRequest, CompactIcebergTableResponse }
2564 ,{ ddl_client, expire_iceberg_table_snapshots, ExpireIcebergTableSnapshotsRequest, ExpireIcebergTableSnapshotsResponse }
2565 ,{ ddl_client, create_iceberg_table, CreateIcebergTableRequest, CreateIcebergTableResponse }
2566 ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2567 ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2568 ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2569 ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2570 ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2571 ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2572 ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2573 ,{ hummock_client, get_new_object_ids, GetNewObjectIdsRequest, GetNewObjectIdsResponse }
2574 ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2575 ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2576 ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2577 ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2578 ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2579 ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2580 ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2581 ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2582 ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2583 ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2584 ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2585 ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2586 ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2587 ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2588 ,{ hummock_client, subscribe_iceberg_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeIcebergCompactionEventRequest>, Streaming<SubscribeIcebergCompactionEventResponse> }
2589 ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2590 ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2591 ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2592 ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2593 ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2594 ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2595 ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2596 ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2597 ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2598 ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2599 ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2600 ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2601 ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2602 ,{ user_client, alter_default_privilege, AlterDefaultPrivilegeRequest, AlterDefaultPrivilegeResponse }
2603 ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2604 ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2605 ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2606 ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2607 ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2608 ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2609 ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2610 ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2611 ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2612 ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2613 ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2614 ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2615 ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2616 ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2617 ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2618 ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2619 ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2620 ,{ hosted_iceberg_catalog_service_client, list_iceberg_tables, ListIcebergTablesRequest, ListIcebergTablesResponse }
2621 ,{ monitor_client, stack_trace, StackTraceRequest, StackTraceResponse }
2622 }
2623 };
2624}
2625
2626impl GrpcMetaClient {
2627 async fn refresh_client_if_needed(&self, code: Code) {
2628 if matches!(
2629 code,
2630 Code::Unknown | Code::Unimplemented | Code::Unavailable
2631 ) {
2632 tracing::debug!("matching tonic code {}", code);
2633 let (result_sender, result_receiver) = oneshot::channel();
2634 if self
2635 .member_monitor_event_sender
2636 .try_send(result_sender)
2637 .is_ok()
2638 {
2639 if let Ok(Err(e)) = result_receiver.await {
2640 tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2641 }
2642 } else {
2643 tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2644 }
2645 }
2646 }
2647}
2648
2649impl GrpcMetaClient {
2650 for_all_meta_rpc! { meta_rpc_client_method_impl }
2651}