1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{FunctionId, IndexId, ObjectId, SecretId, TableId};
33use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
34use risingwave_common::hash::WorkerSlotMapping;
35use risingwave_common::monitor::EndpointExt;
36use risingwave_common::system_param::reader::SystemParamsReader;
37use risingwave_common::telemetry::report::TelemetryInfoFetcher;
38use risingwave_common::util::addr::HostAddr;
39use risingwave_common::util::column_index_mapping::ColIndexMapping;
40use risingwave_common::util::meta_addr::MetaAddressStrategy;
41use risingwave_common::util::resource_util::cpu::total_cpu_available;
42use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
43use risingwave_error::bail;
44use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
45use risingwave_hummock_sdk::compaction_group::StateTableId;
46use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
47use risingwave_hummock_sdk::{
48 CompactionGroupId, HummockEpoch, HummockVersionId, ObjectIdRange, SyncResult,
49};
50use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
51use risingwave_pb::backup_service::*;
52use risingwave_pb::catalog::{
53 Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
54 PbSubscription, PbTable, PbView, Table,
55};
56use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
57use risingwave_pb::cloud_service::*;
58use risingwave_pb::common::worker_node::Property;
59use risingwave_pb::common::{HostAddress, WorkerNode, WorkerType};
60use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
61use risingwave_pb::ddl_service::alter_owner_request::Object;
62use risingwave_pb::ddl_service::create_materialized_view_request::PbBackfillType;
63use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
64use risingwave_pb::ddl_service::drop_table_request::SourceId;
65use risingwave_pb::ddl_service::*;
66use risingwave_pb::hummock::compact_task::TaskStatus;
67use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
68use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
69use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
70use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
71use risingwave_pb::hummock::write_limits::WriteLimit;
72use risingwave_pb::hummock::*;
73use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Register as IcebergRegister;
74use risingwave_pb::iceberg_compaction::{
75 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
76 subscribe_iceberg_compaction_event_request,
77};
78use risingwave_pb::meta::alter_connector_props_request::AlterConnectorPropsObject;
79use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
80use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
81use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
82use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
83use risingwave_pb::meta::hosted_iceberg_catalog_service_client::HostedIcebergCatalogServiceClient;
84use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
85use risingwave_pb::meta::list_actor_states_response::ActorState;
86use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
87use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
88use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
89use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
90use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
91use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
92use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
93use risingwave_pb::meta::serving_service_client::ServingServiceClient;
94use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
95use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
96use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
97use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
98use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
99use risingwave_pb::meta::{FragmentDistribution, *};
100use risingwave_pb::secret::PbSecretRef;
101use risingwave_pb::stream_plan::StreamFragmentGraph;
102use risingwave_pb::user::update_user_request::UpdateField;
103use risingwave_pb::user::user_service_client::UserServiceClient;
104use risingwave_pb::user::*;
105use thiserror_ext::AsReport;
106use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
107use tokio::sync::oneshot::Sender;
108use tokio::sync::{RwLock, mpsc, oneshot};
109use tokio::task::JoinHandle;
110use tokio::time::{self};
111use tokio_retry::strategy::{ExponentialBackoff, jitter};
112use tokio_stream::wrappers::UnboundedReceiverStream;
113use tonic::transport::Endpoint;
114use tonic::{Code, Request, Streaming};
115
116use crate::channel::{Channel, WrappedChannelExt};
117use crate::error::{Result, RpcError};
118use crate::hummock_meta_client::{
119 CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
120 IcebergCompactionEventItem,
121};
122use crate::meta_rpc_client_method_impl;
123
124type ConnectionId = u32;
125type DatabaseId = u32;
126type SchemaId = u32;
127
128#[derive(Clone, Debug)]
130pub struct MetaClient {
131 worker_id: u32,
132 worker_type: WorkerType,
133 host_addr: HostAddr,
134 inner: GrpcMetaClient,
135 meta_config: MetaConfig,
136 cluster_id: String,
137 shutting_down: Arc<AtomicBool>,
138}
139
140impl MetaClient {
141 pub fn worker_id(&self) -> u32 {
142 self.worker_id
143 }
144
145 pub fn host_addr(&self) -> &HostAddr {
146 &self.host_addr
147 }
148
149 pub fn worker_type(&self) -> WorkerType {
150 self.worker_type
151 }
152
153 pub fn cluster_id(&self) -> &str {
154 &self.cluster_id
155 }
156
157 pub async fn subscribe(
159 &self,
160 subscribe_type: SubscribeType,
161 ) -> Result<Streaming<SubscribeResponse>> {
162 let request = SubscribeRequest {
163 subscribe_type: subscribe_type as i32,
164 host: Some(self.host_addr.to_protobuf()),
165 worker_id: self.worker_id(),
166 };
167
168 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
169 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
170 true,
171 );
172
173 tokio_retry::Retry::spawn(retry_strategy, || async {
174 let request = request.clone();
175 self.inner.subscribe(request).await
176 })
177 .await
178 }
179
180 pub async fn create_connection(
181 &self,
182 connection_name: String,
183 database_id: u32,
184 schema_id: u32,
185 owner_id: u32,
186 req: create_connection_request::Payload,
187 ) -> Result<WaitVersion> {
188 let request = CreateConnectionRequest {
189 name: connection_name,
190 database_id,
191 schema_id,
192 owner_id,
193 payload: Some(req),
194 };
195 let resp = self.inner.create_connection(request).await?;
196 Ok(resp
197 .version
198 .ok_or_else(|| anyhow!("wait version not set"))?)
199 }
200
201 pub async fn create_secret(
202 &self,
203 secret_name: String,
204 database_id: u32,
205 schema_id: u32,
206 owner_id: u32,
207 value: Vec<u8>,
208 ) -> Result<WaitVersion> {
209 let request = CreateSecretRequest {
210 name: secret_name,
211 database_id,
212 schema_id,
213 owner_id,
214 value,
215 };
216 let resp = self.inner.create_secret(request).await?;
217 Ok(resp
218 .version
219 .ok_or_else(|| anyhow!("wait version not set"))?)
220 }
221
222 pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
223 let request = ListConnectionsRequest {};
224 let resp = self.inner.list_connections(request).await?;
225 Ok(resp.connections)
226 }
227
228 pub async fn drop_connection(&self, connection_id: ConnectionId) -> Result<WaitVersion> {
229 let request = DropConnectionRequest { connection_id };
230 let resp = self.inner.drop_connection(request).await?;
231 Ok(resp
232 .version
233 .ok_or_else(|| anyhow!("wait version not set"))?)
234 }
235
236 pub async fn drop_secret(&self, secret_id: SecretId) -> Result<WaitVersion> {
237 let request = DropSecretRequest {
238 secret_id: secret_id.into(),
239 };
240 let resp = self.inner.drop_secret(request).await?;
241 Ok(resp
242 .version
243 .ok_or_else(|| anyhow!("wait version not set"))?)
244 }
245
246 pub async fn register_new(
250 addr_strategy: MetaAddressStrategy,
251 worker_type: WorkerType,
252 addr: &HostAddr,
253 property: Property,
254 meta_config: &MetaConfig,
255 ) -> (Self, SystemParamsReader) {
256 let ret =
257 Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
258
259 match ret {
260 Ok(ret) => ret,
261 Err(err) => {
262 tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
263 std::process::exit(1);
264 }
265 }
266 }
267
268 async fn register_new_inner(
269 addr_strategy: MetaAddressStrategy,
270 worker_type: WorkerType,
271 addr: &HostAddr,
272 property: Property,
273 meta_config: &MetaConfig,
274 ) -> Result<(Self, SystemParamsReader)> {
275 tracing::info!("register meta client using strategy: {}", addr_strategy);
276
277 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
279 Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
280 true,
281 );
282
283 if property.is_unschedulable {
284 tracing::warn!("worker {:?} registered as unschedulable", addr.clone());
285 }
286 let init_result: Result<_> = tokio_retry::RetryIf::spawn(
287 retry_strategy,
288 || async {
289 let grpc_meta_client =
290 GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
291
292 let add_worker_resp = grpc_meta_client
293 .add_worker_node(AddWorkerNodeRequest {
294 worker_type: worker_type as i32,
295 host: Some(addr.to_protobuf()),
296 property: Some(property.clone()),
297 resource: Some(risingwave_pb::common::worker_node::Resource {
298 rw_version: RW_VERSION.to_owned(),
299 total_memory_bytes: system_memory_available_bytes() as _,
300 total_cpu_cores: total_cpu_available() as _,
301 }),
302 })
303 .await
304 .context("failed to add worker node")?;
305
306 let system_params_resp = grpc_meta_client
307 .get_system_params(GetSystemParamsRequest {})
308 .await
309 .context("failed to get initial system params")?;
310
311 Ok((add_worker_resp, system_params_resp, grpc_meta_client))
312 },
313 |e: &RpcError| !e.is_from_tonic_server_impl(),
316 )
317 .await;
318
319 let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
320 let worker_id = add_worker_resp
321 .node_id
322 .expect("AddWorkerNodeResponse::node_id is empty");
323
324 let meta_client = Self {
325 worker_id,
326 worker_type,
327 host_addr: addr.clone(),
328 inner: grpc_meta_client,
329 meta_config: meta_config.to_owned(),
330 cluster_id: add_worker_resp.cluster_id,
331 shutting_down: Arc::new(false.into()),
332 };
333
334 static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
335 REPORT_PANIC.call_once(|| {
336 let meta_client_clone = meta_client.clone();
337 std::panic::update_hook(move |default_hook, info| {
338 meta_client_clone.try_add_panic_event_blocking(info, None);
340 default_hook(info);
341 });
342 });
343
344 Ok((meta_client, system_params_resp.params.unwrap().into()))
345 }
346
347 pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
349 let request = ActivateWorkerNodeRequest {
350 host: Some(addr.to_protobuf()),
351 node_id: self.worker_id,
352 };
353 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
354 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
355 true,
356 );
357 tokio_retry::Retry::spawn(retry_strategy, || async {
358 let request = request.clone();
359 self.inner.activate_worker_node(request).await
360 })
361 .await?;
362
363 Ok(())
364 }
365
366 pub async fn send_heartbeat(&self, node_id: u32) -> Result<()> {
368 let request = HeartbeatRequest { node_id };
369 let resp = self.inner.heartbeat(request).await?;
370 if let Some(status) = resp.status {
371 if status.code() == risingwave_pb::common::status::Code::UnknownWorker {
372 if !self.shutting_down.load(Relaxed) {
375 tracing::error!(message = status.message, "worker expired");
376 std::process::exit(1);
377 }
378 }
379 }
380 Ok(())
381 }
382
383 pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
384 let request = CreateDatabaseRequest { db: Some(db) };
385 let resp = self.inner.create_database(request).await?;
386 Ok(resp
388 .version
389 .ok_or_else(|| anyhow!("wait version not set"))?)
390 }
391
392 pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
393 let request = CreateSchemaRequest {
394 schema: Some(schema),
395 };
396 let resp = self.inner.create_schema(request).await?;
397 Ok(resp
399 .version
400 .ok_or_else(|| anyhow!("wait version not set"))?)
401 }
402
403 pub async fn create_materialized_view(
404 &self,
405 table: PbTable,
406 graph: StreamFragmentGraph,
407 dependencies: HashSet<ObjectId>,
408 specific_resource_group: Option<String>,
409 ) -> Result<WaitVersion> {
410 let request = CreateMaterializedViewRequest {
411 materialized_view: Some(table),
412 fragment_graph: Some(graph),
413 backfill: PbBackfillType::Regular as _,
414 dependencies: dependencies.into_iter().collect(),
415 specific_resource_group,
416 };
417 let resp = self.inner.create_materialized_view(request).await?;
418 Ok(resp
420 .version
421 .ok_or_else(|| anyhow!("wait version not set"))?)
422 }
423
424 pub async fn drop_materialized_view(
425 &self,
426 table_id: TableId,
427 cascade: bool,
428 ) -> Result<WaitVersion> {
429 let request = DropMaterializedViewRequest {
430 table_id: table_id.table_id(),
431 cascade,
432 };
433
434 let resp = self.inner.drop_materialized_view(request).await?;
435 Ok(resp
436 .version
437 .ok_or_else(|| anyhow!("wait version not set"))?)
438 }
439
440 pub async fn create_source(
441 &self,
442 source: PbSource,
443 graph: Option<StreamFragmentGraph>,
444 ) -> Result<WaitVersion> {
445 let request = CreateSourceRequest {
446 source: Some(source),
447 fragment_graph: graph,
448 };
449
450 let resp = self.inner.create_source(request).await?;
451 Ok(resp
452 .version
453 .ok_or_else(|| anyhow!("wait version not set"))?)
454 }
455
456 pub async fn create_sink(
457 &self,
458 sink: PbSink,
459 graph: StreamFragmentGraph,
460 affected_table_change: Option<ReplaceJobPlan>,
461 dependencies: HashSet<ObjectId>,
462 ) -> Result<WaitVersion> {
463 let request = CreateSinkRequest {
464 sink: Some(sink),
465 fragment_graph: Some(graph),
466 affected_table_change,
467 dependencies: dependencies.into_iter().collect(),
468 };
469
470 let resp = self.inner.create_sink(request).await?;
471 Ok(resp
472 .version
473 .ok_or_else(|| anyhow!("wait version not set"))?)
474 }
475
476 pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
477 let request = CreateSubscriptionRequest {
478 subscription: Some(subscription),
479 };
480
481 let resp = self.inner.create_subscription(request).await?;
482 Ok(resp
483 .version
484 .ok_or_else(|| anyhow!("wait version not set"))?)
485 }
486
487 pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
488 let request = CreateFunctionRequest {
489 function: Some(function),
490 };
491 let resp = self.inner.create_function(request).await?;
492 Ok(resp
493 .version
494 .ok_or_else(|| anyhow!("wait version not set"))?)
495 }
496
497 pub async fn create_table(
498 &self,
499 source: Option<PbSource>,
500 table: PbTable,
501 graph: StreamFragmentGraph,
502 job_type: PbTableJobType,
503 ) -> Result<WaitVersion> {
504 let request = CreateTableRequest {
505 materialized_view: Some(table),
506 fragment_graph: Some(graph),
507 source,
508 job_type: job_type as _,
509 };
510 let resp = self.inner.create_table(request).await?;
511 Ok(resp
513 .version
514 .ok_or_else(|| anyhow!("wait version not set"))?)
515 }
516
517 pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
518 let request = CommentOnRequest {
519 comment: Some(comment),
520 };
521 let resp = self.inner.comment_on(request).await?;
522 Ok(resp
523 .version
524 .ok_or_else(|| anyhow!("wait version not set"))?)
525 }
526
527 pub async fn alter_name(
528 &self,
529 object: alter_name_request::Object,
530 name: &str,
531 ) -> Result<WaitVersion> {
532 let request = AlterNameRequest {
533 object: Some(object),
534 new_name: name.to_owned(),
535 };
536 let resp = self.inner.alter_name(request).await?;
537 Ok(resp
538 .version
539 .ok_or_else(|| anyhow!("wait version not set"))?)
540 }
541
542 pub async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<WaitVersion> {
543 let request = AlterOwnerRequest {
544 object: Some(object),
545 owner_id,
546 };
547 let resp = self.inner.alter_owner(request).await?;
548 Ok(resp
549 .version
550 .ok_or_else(|| anyhow!("wait version not set"))?)
551 }
552
553 pub async fn alter_set_schema(
554 &self,
555 object: alter_set_schema_request::Object,
556 new_schema_id: u32,
557 ) -> Result<WaitVersion> {
558 let request = AlterSetSchemaRequest {
559 new_schema_id,
560 object: Some(object),
561 };
562 let resp = self.inner.alter_set_schema(request).await?;
563 Ok(resp
564 .version
565 .ok_or_else(|| anyhow!("wait version not set"))?)
566 }
567
568 pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
569 let request = AlterSourceRequest {
570 source: Some(source),
571 };
572 let resp = self.inner.alter_source(request).await?;
573 Ok(resp
574 .version
575 .ok_or_else(|| anyhow!("wait version not set"))?)
576 }
577
578 pub async fn alter_parallelism(
579 &self,
580 table_id: u32,
581 parallelism: PbTableParallelism,
582 deferred: bool,
583 ) -> Result<()> {
584 let request = AlterParallelismRequest {
585 table_id,
586 parallelism: Some(parallelism),
587 deferred,
588 };
589
590 self.inner.alter_parallelism(request).await?;
591 Ok(())
592 }
593
594 pub async fn alter_resource_group(
595 &self,
596 table_id: u32,
597 resource_group: Option<String>,
598 deferred: bool,
599 ) -> Result<()> {
600 let request = AlterResourceGroupRequest {
601 table_id,
602 resource_group,
603 deferred,
604 };
605
606 self.inner.alter_resource_group(request).await?;
607 Ok(())
608 }
609
610 pub async fn alter_swap_rename(
611 &self,
612 object: alter_swap_rename_request::Object,
613 ) -> Result<WaitVersion> {
614 let request = AlterSwapRenameRequest {
615 object: Some(object),
616 };
617 let resp = self.inner.alter_swap_rename(request).await?;
618 Ok(resp
619 .version
620 .ok_or_else(|| anyhow!("wait version not set"))?)
621 }
622
623 pub async fn alter_secret(
624 &self,
625 secret_id: u32,
626 secret_name: String,
627 database_id: u32,
628 schema_id: u32,
629 owner_id: u32,
630 value: Vec<u8>,
631 ) -> Result<WaitVersion> {
632 let request = AlterSecretRequest {
633 secret_id,
634 name: secret_name,
635 database_id,
636 schema_id,
637 owner_id,
638 value,
639 };
640 let resp = self.inner.alter_secret(request).await?;
641 Ok(resp
642 .version
643 .ok_or_else(|| anyhow!("wait version not set"))?)
644 }
645
646 pub async fn replace_job(
647 &self,
648 graph: StreamFragmentGraph,
649 table_col_index_mapping: ColIndexMapping,
650 replace_job: ReplaceJob,
651 ) -> Result<WaitVersion> {
652 let request = ReplaceJobPlanRequest {
653 plan: Some(ReplaceJobPlan {
654 fragment_graph: Some(graph),
655 table_col_index_mapping: Some(table_col_index_mapping.to_protobuf()),
656 replace_job: Some(replace_job),
657 }),
658 };
659 let resp = self.inner.replace_job_plan(request).await?;
660 Ok(resp
662 .version
663 .ok_or_else(|| anyhow!("wait version not set"))?)
664 }
665
666 pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
667 let request = AutoSchemaChangeRequest {
668 schema_change: Some(schema_change),
669 };
670 let _ = self.inner.auto_schema_change(request).await?;
671 Ok(())
672 }
673
674 pub async fn create_view(&self, view: PbView) -> Result<WaitVersion> {
675 let request = CreateViewRequest { view: Some(view) };
676 let resp = self.inner.create_view(request).await?;
677 Ok(resp
679 .version
680 .ok_or_else(|| anyhow!("wait version not set"))?)
681 }
682
683 pub async fn create_index(
684 &self,
685 index: PbIndex,
686 table: PbTable,
687 graph: StreamFragmentGraph,
688 ) -> Result<WaitVersion> {
689 let request = CreateIndexRequest {
690 index: Some(index),
691 index_table: Some(table),
692 fragment_graph: Some(graph),
693 };
694 let resp = self.inner.create_index(request).await?;
695 Ok(resp
697 .version
698 .ok_or_else(|| anyhow!("wait version not set"))?)
699 }
700
701 pub async fn drop_table(
702 &self,
703 source_id: Option<u32>,
704 table_id: TableId,
705 cascade: bool,
706 ) -> Result<WaitVersion> {
707 let request = DropTableRequest {
708 source_id: source_id.map(SourceId::Id),
709 table_id: table_id.table_id(),
710 cascade,
711 };
712
713 let resp = self.inner.drop_table(request).await?;
714 Ok(resp
715 .version
716 .ok_or_else(|| anyhow!("wait version not set"))?)
717 }
718
719 pub async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<WaitVersion> {
720 let request = DropViewRequest { view_id, cascade };
721 let resp = self.inner.drop_view(request).await?;
722 Ok(resp
723 .version
724 .ok_or_else(|| anyhow!("wait version not set"))?)
725 }
726
727 pub async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<WaitVersion> {
728 let request = DropSourceRequest { source_id, cascade };
729 let resp = self.inner.drop_source(request).await?;
730 Ok(resp
731 .version
732 .ok_or_else(|| anyhow!("wait version not set"))?)
733 }
734
735 pub async fn drop_sink(
736 &self,
737 sink_id: u32,
738 cascade: bool,
739 affected_table_change: Option<ReplaceJobPlan>,
740 ) -> Result<WaitVersion> {
741 let request = DropSinkRequest {
742 sink_id,
743 cascade,
744 affected_table_change,
745 };
746 let resp = self.inner.drop_sink(request).await?;
747 Ok(resp
748 .version
749 .ok_or_else(|| anyhow!("wait version not set"))?)
750 }
751
752 pub async fn drop_subscription(
753 &self,
754 subscription_id: u32,
755 cascade: bool,
756 ) -> Result<WaitVersion> {
757 let request = DropSubscriptionRequest {
758 subscription_id,
759 cascade,
760 };
761 let resp = self.inner.drop_subscription(request).await?;
762 Ok(resp
763 .version
764 .ok_or_else(|| anyhow!("wait version not set"))?)
765 }
766
767 pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
768 let request = DropIndexRequest {
769 index_id: index_id.index_id,
770 cascade,
771 };
772 let resp = self.inner.drop_index(request).await?;
773 Ok(resp
774 .version
775 .ok_or_else(|| anyhow!("wait version not set"))?)
776 }
777
778 pub async fn drop_function(&self, function_id: FunctionId) -> Result<WaitVersion> {
779 let request = DropFunctionRequest {
780 function_id: function_id.0,
781 };
782 let resp = self.inner.drop_function(request).await?;
783 Ok(resp
784 .version
785 .ok_or_else(|| anyhow!("wait version not set"))?)
786 }
787
788 pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
789 let request = DropDatabaseRequest { database_id };
790 let resp = self.inner.drop_database(request).await?;
791 Ok(resp
792 .version
793 .ok_or_else(|| anyhow!("wait version not set"))?)
794 }
795
796 pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
797 let request = DropSchemaRequest { schema_id, cascade };
798 let resp = self.inner.drop_schema(request).await?;
799 Ok(resp
800 .version
801 .ok_or_else(|| anyhow!("wait version not set"))?)
802 }
803
804 pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
806 let request = CreateUserRequest { user: Some(user) };
807 let resp = self.inner.create_user(request).await?;
808 Ok(resp.version)
809 }
810
811 pub async fn drop_user(&self, user_id: u32) -> Result<u64> {
812 let request = DropUserRequest { user_id };
813 let resp = self.inner.drop_user(request).await?;
814 Ok(resp.version)
815 }
816
817 pub async fn update_user(
818 &self,
819 user: UserInfo,
820 update_fields: Vec<UpdateField>,
821 ) -> Result<u64> {
822 let request = UpdateUserRequest {
823 user: Some(user),
824 update_fields: update_fields
825 .into_iter()
826 .map(|field| field as i32)
827 .collect::<Vec<_>>(),
828 };
829 let resp = self.inner.update_user(request).await?;
830 Ok(resp.version)
831 }
832
833 pub async fn grant_privilege(
834 &self,
835 user_ids: Vec<u32>,
836 privileges: Vec<GrantPrivilege>,
837 with_grant_option: bool,
838 granted_by: u32,
839 ) -> Result<u64> {
840 let request = GrantPrivilegeRequest {
841 user_ids,
842 privileges,
843 with_grant_option,
844 granted_by,
845 };
846 let resp = self.inner.grant_privilege(request).await?;
847 Ok(resp.version)
848 }
849
850 pub async fn revoke_privilege(
851 &self,
852 user_ids: Vec<u32>,
853 privileges: Vec<GrantPrivilege>,
854 granted_by: u32,
855 revoke_by: u32,
856 revoke_grant_option: bool,
857 cascade: bool,
858 ) -> Result<u64> {
859 let request = RevokePrivilegeRequest {
860 user_ids,
861 privileges,
862 granted_by,
863 revoke_by,
864 revoke_grant_option,
865 cascade,
866 };
867 let resp = self.inner.revoke_privilege(request).await?;
868 Ok(resp.version)
869 }
870
871 pub async fn unregister(&self) -> Result<()> {
873 let request = DeleteWorkerNodeRequest {
874 host: Some(self.host_addr.to_protobuf()),
875 };
876 self.inner.delete_worker_node(request).await?;
877 self.shutting_down.store(true, Relaxed);
878 Ok(())
879 }
880
881 pub async fn try_unregister(&self) {
883 match self.unregister().await {
884 Ok(_) => {
885 tracing::info!(
886 worker_id = self.worker_id(),
887 "successfully unregistered from meta service",
888 )
889 }
890 Err(e) => {
891 tracing::warn!(
892 error = %e.as_report(),
893 worker_id = self.worker_id(),
894 "failed to unregister from meta service",
895 );
896 }
897 }
898 }
899
900 pub async fn update_schedulability(
901 &self,
902 worker_ids: &[u32],
903 schedulability: Schedulability,
904 ) -> Result<UpdateWorkerNodeSchedulabilityResponse> {
905 let request = UpdateWorkerNodeSchedulabilityRequest {
906 worker_ids: worker_ids.to_vec(),
907 schedulability: schedulability.into(),
908 };
909 let resp = self
910 .inner
911 .update_worker_node_schedulability(request)
912 .await?;
913 Ok(resp)
914 }
915
916 pub async fn list_worker_nodes(
917 &self,
918 worker_type: Option<WorkerType>,
919 ) -> Result<Vec<WorkerNode>> {
920 let request = ListAllNodesRequest {
921 worker_type: worker_type.map(Into::into),
922 include_starting_nodes: true,
923 };
924 let resp = self.inner.list_all_nodes(request).await?;
925 Ok(resp.nodes)
926 }
927
928 pub fn start_heartbeat_loop(
930 meta_client: MetaClient,
931 min_interval: Duration,
932 ) -> (JoinHandle<()>, Sender<()>) {
933 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
934 let join_handle = tokio::spawn(async move {
935 let mut min_interval_ticker = tokio::time::interval(min_interval);
936 loop {
937 tokio::select! {
938 biased;
939 _ = &mut shutdown_rx => {
941 tracing::info!("Heartbeat loop is stopped");
942 return;
943 }
944 _ = min_interval_ticker.tick() => {},
946 }
947 tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
948 match tokio::time::timeout(
949 min_interval * 3,
951 meta_client.send_heartbeat(meta_client.worker_id()),
952 )
953 .await
954 {
955 Ok(Ok(_)) => {}
956 Ok(Err(err)) => {
957 tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
958 }
959 Err(_) => {
960 tracing::warn!("Failed to send_heartbeat: timeout");
961 }
962 }
963 }
964 });
965 (join_handle, shutdown_tx)
966 }
967
968 pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
969 let request = RisectlListStateTablesRequest {};
970 let resp = self.inner.risectl_list_state_tables(request).await?;
971 Ok(resp.tables)
972 }
973
974 pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
975 let request = FlushRequest { database_id };
976 let resp = self.inner.flush(request).await?;
977 Ok(HummockVersionId::new(resp.hummock_version_id))
978 }
979
980 pub async fn wait(&self) -> Result<()> {
981 let request = WaitRequest {};
982 self.inner.wait(request).await?;
983 Ok(())
984 }
985
986 pub async fn recover(&self) -> Result<()> {
987 let request = RecoverRequest {};
988 self.inner.recover(request).await?;
989 Ok(())
990 }
991
992 pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
993 let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
994 let resp = self.inner.cancel_creating_jobs(request).await?;
995 Ok(resp.canceled_jobs)
996 }
997
998 pub async fn list_table_fragments(
999 &self,
1000 table_ids: &[u32],
1001 ) -> Result<HashMap<u32, TableFragmentInfo>> {
1002 let request = ListTableFragmentsRequest {
1003 table_ids: table_ids.to_vec(),
1004 };
1005 let resp = self.inner.list_table_fragments(request).await?;
1006 Ok(resp.table_fragments)
1007 }
1008
1009 pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1010 let resp = self
1011 .inner
1012 .list_streaming_job_states(ListStreamingJobStatesRequest {})
1013 .await?;
1014 Ok(resp.states)
1015 }
1016
1017 pub async fn list_fragment_distributions(&self) -> Result<Vec<FragmentDistribution>> {
1018 let resp = self
1019 .inner
1020 .list_fragment_distribution(ListFragmentDistributionRequest {})
1021 .await?;
1022 Ok(resp.distributions)
1023 }
1024
1025 pub async fn get_fragment_by_id(
1026 &self,
1027 fragment_id: u32,
1028 ) -> Result<Option<FragmentDistribution>> {
1029 let resp = self
1030 .inner
1031 .get_fragment_by_id(GetFragmentByIdRequest { fragment_id })
1032 .await?;
1033 Ok(resp.distribution)
1034 }
1035
1036 pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1037 let resp = self
1038 .inner
1039 .list_actor_states(ListActorStatesRequest {})
1040 .await?;
1041 Ok(resp.states)
1042 }
1043
1044 pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1045 let resp = self
1046 .inner
1047 .list_actor_splits(ListActorSplitsRequest {})
1048 .await?;
1049
1050 Ok(resp.actor_splits)
1051 }
1052
1053 pub async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
1054 let resp = self
1055 .inner
1056 .list_object_dependencies(ListObjectDependenciesRequest {})
1057 .await?;
1058 Ok(resp.dependencies)
1059 }
1060
1061 pub async fn pause(&self) -> Result<PauseResponse> {
1062 let request = PauseRequest {};
1063 let resp = self.inner.pause(request).await?;
1064 Ok(resp)
1065 }
1066
1067 pub async fn resume(&self) -> Result<ResumeResponse> {
1068 let request = ResumeRequest {};
1069 let resp = self.inner.resume(request).await?;
1070 Ok(resp)
1071 }
1072
1073 pub async fn apply_throttle(
1074 &self,
1075 kind: PbThrottleTarget,
1076 id: u32,
1077 rate: Option<u32>,
1078 ) -> Result<ApplyThrottleResponse> {
1079 let request = ApplyThrottleRequest {
1080 kind: kind as i32,
1081 id,
1082 rate,
1083 };
1084 let resp = self.inner.apply_throttle(request).await?;
1085 Ok(resp)
1086 }
1087
1088 pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1089 let resp = self
1090 .inner
1091 .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1092 .await?;
1093 Ok(resp.get_status().unwrap())
1094 }
1095
1096 pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1097 let request = GetClusterInfoRequest {};
1098 let resp = self.inner.get_cluster_info(request).await?;
1099 Ok(resp)
1100 }
1101
1102 pub async fn reschedule(
1103 &self,
1104 worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1105 revision: u64,
1106 resolve_no_shuffle_upstream: bool,
1107 ) -> Result<(bool, u64)> {
1108 let request = RescheduleRequest {
1109 revision,
1110 resolve_no_shuffle_upstream,
1111 worker_reschedules,
1112 };
1113 let resp = self.inner.reschedule(request).await?;
1114 Ok((resp.success, resp.revision))
1115 }
1116
1117 pub async fn risectl_get_pinned_versions_summary(
1118 &self,
1119 ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1120 let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1121 self.inner
1122 .rise_ctl_get_pinned_versions_summary(request)
1123 .await
1124 }
1125
1126 pub async fn risectl_get_checkpoint_hummock_version(
1127 &self,
1128 ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1129 let request = RiseCtlGetCheckpointVersionRequest {};
1130 self.inner.rise_ctl_get_checkpoint_version(request).await
1131 }
1132
1133 pub async fn risectl_pause_hummock_version_checkpoint(
1134 &self,
1135 ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1136 let request = RiseCtlPauseVersionCheckpointRequest {};
1137 self.inner.rise_ctl_pause_version_checkpoint(request).await
1138 }
1139
1140 pub async fn risectl_resume_hummock_version_checkpoint(
1141 &self,
1142 ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1143 let request = RiseCtlResumeVersionCheckpointRequest {};
1144 self.inner.rise_ctl_resume_version_checkpoint(request).await
1145 }
1146
1147 pub async fn init_metadata_for_replay(
1148 &self,
1149 tables: Vec<PbTable>,
1150 compaction_groups: Vec<CompactionGroupInfo>,
1151 ) -> Result<()> {
1152 let req = InitMetadataForReplayRequest {
1153 tables,
1154 compaction_groups,
1155 };
1156 let _resp = self.inner.init_metadata_for_replay(req).await?;
1157 Ok(())
1158 }
1159
1160 pub async fn replay_version_delta(
1161 &self,
1162 version_delta: HummockVersionDelta,
1163 ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1164 let req = ReplayVersionDeltaRequest {
1165 version_delta: Some(version_delta.into()),
1166 };
1167 let resp = self.inner.replay_version_delta(req).await?;
1168 Ok((
1169 HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1170 resp.modified_compaction_groups,
1171 ))
1172 }
1173
1174 pub async fn list_version_deltas(
1175 &self,
1176 start_id: HummockVersionId,
1177 num_limit: u32,
1178 committed_epoch_limit: HummockEpoch,
1179 ) -> Result<Vec<HummockVersionDelta>> {
1180 let req = ListVersionDeltasRequest {
1181 start_id: start_id.to_u64(),
1182 num_limit,
1183 committed_epoch_limit,
1184 };
1185 Ok(self
1186 .inner
1187 .list_version_deltas(req)
1188 .await?
1189 .version_deltas
1190 .unwrap()
1191 .version_deltas
1192 .iter()
1193 .map(HummockVersionDelta::from_rpc_protobuf)
1194 .collect())
1195 }
1196
1197 pub async fn trigger_compaction_deterministic(
1198 &self,
1199 version_id: HummockVersionId,
1200 compaction_groups: Vec<CompactionGroupId>,
1201 ) -> Result<()> {
1202 let req = TriggerCompactionDeterministicRequest {
1203 version_id: version_id.to_u64(),
1204 compaction_groups,
1205 };
1206 self.inner.trigger_compaction_deterministic(req).await?;
1207 Ok(())
1208 }
1209
1210 pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1211 let req = DisableCommitEpochRequest {};
1212 Ok(HummockVersion::from_rpc_protobuf(
1213 &self
1214 .inner
1215 .disable_commit_epoch(req)
1216 .await?
1217 .current_version
1218 .unwrap(),
1219 ))
1220 }
1221
1222 pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1223 let req = GetAssignedCompactTaskNumRequest {};
1224 let resp = self.inner.get_assigned_compact_task_num(req).await?;
1225 Ok(resp.num_tasks as usize)
1226 }
1227
1228 pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1229 let req = RiseCtlListCompactionGroupRequest {};
1230 let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1231 Ok(resp.compaction_groups)
1232 }
1233
1234 pub async fn risectl_update_compaction_config(
1235 &self,
1236 compaction_groups: &[CompactionGroupId],
1237 configs: &[MutableConfig],
1238 ) -> Result<()> {
1239 let req = RiseCtlUpdateCompactionConfigRequest {
1240 compaction_group_ids: compaction_groups.to_vec(),
1241 configs: configs
1242 .iter()
1243 .map(
1244 |c| rise_ctl_update_compaction_config_request::MutableConfig {
1245 mutable_config: Some(c.clone()),
1246 },
1247 )
1248 .collect(),
1249 };
1250 let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1251 Ok(())
1252 }
1253
1254 pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1255 let req = BackupMetaRequest { remarks };
1256 let resp = self.inner.backup_meta(req).await?;
1257 Ok(resp.job_id)
1258 }
1259
1260 pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1261 let req = GetBackupJobStatusRequest { job_id };
1262 let resp = self.inner.get_backup_job_status(req).await?;
1263 Ok((resp.job_status(), resp.message))
1264 }
1265
1266 pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1267 let req = DeleteMetaSnapshotRequest {
1268 snapshot_ids: snapshot_ids.to_vec(),
1269 };
1270 let _resp = self.inner.delete_meta_snapshot(req).await?;
1271 Ok(())
1272 }
1273
1274 pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1275 let req = GetMetaSnapshotManifestRequest {};
1276 let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1277 Ok(resp.manifest.expect("should exist"))
1278 }
1279
1280 pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1281 let req = GetTelemetryInfoRequest {};
1282 let resp = self.inner.get_telemetry_info(req).await?;
1283 Ok(resp)
1284 }
1285
1286 pub async fn get_system_params(&self) -> Result<SystemParamsReader> {
1287 let req = GetSystemParamsRequest {};
1288 let resp = self.inner.get_system_params(req).await?;
1289 Ok(resp.params.unwrap().into())
1290 }
1291
1292 pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1293 let req = GetMetaStoreInfoRequest {};
1294 let resp = self.inner.get_meta_store_info(req).await?;
1295 Ok(resp.meta_store_endpoint)
1296 }
1297
1298 pub async fn alter_sink_props(
1299 &self,
1300 sink_id: u32,
1301 changed_props: BTreeMap<String, String>,
1302 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1303 connector_conn_ref: Option<u32>,
1304 ) -> Result<()> {
1305 let req = AlterConnectorPropsRequest {
1306 object_id: sink_id,
1307 changed_props: changed_props.into_iter().collect(),
1308 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1309 connector_conn_ref,
1310 object_type: AlterConnectorPropsObject::Sink as i32,
1311 };
1312 let _resp = self.inner.alter_connector_props(req).await?;
1313 Ok(())
1314 }
1315
1316 pub async fn set_system_param(
1317 &self,
1318 param: String,
1319 value: Option<String>,
1320 ) -> Result<Option<SystemParamsReader>> {
1321 let req = SetSystemParamRequest { param, value };
1322 let resp = self.inner.set_system_param(req).await?;
1323 Ok(resp.params.map(SystemParamsReader::from))
1324 }
1325
1326 pub async fn get_session_params(&self) -> Result<String> {
1327 let req = GetSessionParamsRequest {};
1328 let resp = self.inner.get_session_params(req).await?;
1329 Ok(resp.params)
1330 }
1331
1332 pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1333 let req = SetSessionParamRequest { param, value };
1334 let resp = self.inner.set_session_param(req).await?;
1335 Ok(resp.param)
1336 }
1337
1338 pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1339 let req = GetDdlProgressRequest {};
1340 let resp = self.inner.get_ddl_progress(req).await?;
1341 Ok(resp.ddl_progress)
1342 }
1343
1344 pub async fn split_compaction_group(
1345 &self,
1346 group_id: CompactionGroupId,
1347 table_ids_to_new_group: &[StateTableId],
1348 partition_vnode_count: u32,
1349 ) -> Result<CompactionGroupId> {
1350 let req = SplitCompactionGroupRequest {
1351 group_id,
1352 table_ids: table_ids_to_new_group.to_vec(),
1353 partition_vnode_count,
1354 };
1355 let resp = self.inner.split_compaction_group(req).await?;
1356 Ok(resp.new_group_id)
1357 }
1358
1359 pub async fn get_tables(
1360 &self,
1361 table_ids: &[u32],
1362 include_dropped_tables: bool,
1363 ) -> Result<HashMap<u32, Table>> {
1364 let req = GetTablesRequest {
1365 table_ids: table_ids.to_vec(),
1366 include_dropped_tables,
1367 };
1368 let resp = self.inner.get_tables(req).await?;
1369 Ok(resp.tables)
1370 }
1371
1372 pub async fn list_serving_vnode_mappings(
1373 &self,
1374 ) -> Result<HashMap<u32, (u32, WorkerSlotMapping)>> {
1375 let req = GetServingVnodeMappingsRequest {};
1376 let resp = self.inner.get_serving_vnode_mappings(req).await?;
1377 let mappings = resp
1378 .worker_slot_mappings
1379 .into_iter()
1380 .map(|p| {
1381 (
1382 p.fragment_id,
1383 (
1384 resp.fragment_to_table
1385 .get(&p.fragment_id)
1386 .cloned()
1387 .unwrap_or(0),
1388 WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1389 ),
1390 )
1391 })
1392 .collect();
1393 Ok(mappings)
1394 }
1395
1396 pub async fn risectl_list_compaction_status(
1397 &self,
1398 ) -> Result<(
1399 Vec<CompactStatus>,
1400 Vec<CompactTaskAssignment>,
1401 Vec<CompactTaskProgress>,
1402 )> {
1403 let req = RiseCtlListCompactionStatusRequest {};
1404 let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1405 Ok((
1406 resp.compaction_statuses,
1407 resp.task_assignment,
1408 resp.task_progress,
1409 ))
1410 }
1411
1412 pub async fn get_compaction_score(
1413 &self,
1414 compaction_group_id: CompactionGroupId,
1415 ) -> Result<Vec<PickerInfo>> {
1416 let req = GetCompactionScoreRequest {
1417 compaction_group_id,
1418 };
1419 let resp = self.inner.get_compaction_score(req).await?;
1420 Ok(resp.scores)
1421 }
1422
1423 pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1424 let req = RiseCtlRebuildTableStatsRequest {};
1425 let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1426 Ok(())
1427 }
1428
1429 pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1430 let req = ListBranchedObjectRequest {};
1431 let resp = self.inner.list_branched_object(req).await?;
1432 Ok(resp.branched_objects)
1433 }
1434
1435 pub async fn list_active_write_limit(&self) -> Result<HashMap<u64, WriteLimit>> {
1436 let req = ListActiveWriteLimitRequest {};
1437 let resp = self.inner.list_active_write_limit(req).await?;
1438 Ok(resp.write_limits)
1439 }
1440
1441 pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1442 let req = ListHummockMetaConfigRequest {};
1443 let resp = self.inner.list_hummock_meta_config(req).await?;
1444 Ok(resp.configs)
1445 }
1446
1447 pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1448 let _resp = self
1449 .inner
1450 .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1451 .await?;
1452
1453 Ok(())
1454 }
1455
1456 pub async fn rw_cloud_validate_source(
1457 &self,
1458 source_type: SourceType,
1459 source_config: HashMap<String, String>,
1460 ) -> Result<RwCloudValidateSourceResponse> {
1461 let req = RwCloudValidateSourceRequest {
1462 source_type: source_type.into(),
1463 source_config,
1464 };
1465 let resp = self.inner.rw_cloud_validate_source(req).await?;
1466 Ok(resp)
1467 }
1468
1469 pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1470 self.inner.core.read().await.sink_coordinate_client.clone()
1471 }
1472
1473 pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1474 let req = ListCompactTaskAssignmentRequest {};
1475 let resp = self.inner.list_compact_task_assignment(req).await?;
1476 Ok(resp.task_assignment)
1477 }
1478
1479 pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1480 let req = ListEventLogRequest::default();
1481 let resp = self.inner.list_event_log(req).await?;
1482 Ok(resp.event_logs)
1483 }
1484
1485 pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1486 let req = ListCompactTaskProgressRequest {};
1487 let resp = self.inner.list_compact_task_progress(req).await?;
1488 Ok(resp.task_progress)
1489 }
1490
1491 #[cfg(madsim)]
1492 pub fn try_add_panic_event_blocking(
1493 &self,
1494 panic_info: impl Display,
1495 timeout_millis: Option<u64>,
1496 ) {
1497 }
1498
1499 #[cfg(not(madsim))]
1501 pub fn try_add_panic_event_blocking(
1502 &self,
1503 panic_info: impl Display,
1504 timeout_millis: Option<u64>,
1505 ) {
1506 let event = event_log::EventWorkerNodePanic {
1507 worker_id: self.worker_id,
1508 worker_type: self.worker_type.into(),
1509 host_addr: Some(self.host_addr.to_protobuf()),
1510 panic_info: format!("{panic_info}"),
1511 };
1512 let grpc_meta_client = self.inner.clone();
1513 let _ = thread::spawn(move || {
1514 let rt = tokio::runtime::Builder::new_current_thread()
1515 .enable_all()
1516 .build()
1517 .unwrap();
1518 let req = AddEventLogRequest {
1519 event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1520 };
1521 rt.block_on(async {
1522 let _ = tokio::time::timeout(
1523 Duration::from_millis(timeout_millis.unwrap_or(1000)),
1524 grpc_meta_client.add_event_log(req),
1525 )
1526 .await;
1527 });
1528 })
1529 .join();
1530 }
1531
1532 pub async fn add_sink_fail_evet(
1533 &self,
1534 sink_id: u32,
1535 sink_name: String,
1536 connector: String,
1537 error: String,
1538 ) -> Result<()> {
1539 let event = event_log::EventSinkFail {
1540 sink_id,
1541 sink_name,
1542 connector,
1543 error,
1544 };
1545 let req = AddEventLogRequest {
1546 event: Some(add_event_log_request::Event::SinkFail(event)),
1547 };
1548 self.inner.add_event_log(req).await?;
1549 Ok(())
1550 }
1551
1552 pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1553 let req = CancelCompactTaskRequest {
1554 task_id,
1555 task_status: task_status as _,
1556 };
1557 let resp = self.inner.cancel_compact_task(req).await?;
1558 Ok(resp.ret)
1559 }
1560
1561 pub async fn get_version_by_epoch(
1562 &self,
1563 epoch: HummockEpoch,
1564 table_id: u32,
1565 ) -> Result<PbHummockVersion> {
1566 let req = GetVersionByEpochRequest { epoch, table_id };
1567 let resp = self.inner.get_version_by_epoch(req).await?;
1568 Ok(resp.version.unwrap())
1569 }
1570
1571 pub async fn get_cluster_limits(
1572 &self,
1573 ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1574 let req = GetClusterLimitsRequest {};
1575 let resp = self.inner.get_cluster_limits(req).await?;
1576 Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1577 }
1578
1579 pub async fn merge_compaction_group(
1580 &self,
1581 left_group_id: CompactionGroupId,
1582 right_group_id: CompactionGroupId,
1583 ) -> Result<()> {
1584 let req = MergeCompactionGroupRequest {
1585 left_group_id,
1586 right_group_id,
1587 };
1588 self.inner.merge_compaction_group(req).await?;
1589 Ok(())
1590 }
1591
1592 pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1594 let request = ListRateLimitsRequest {};
1595 let resp = self.inner.list_rate_limits(request).await?;
1596 Ok(resp.rate_limits)
1597 }
1598
1599 pub async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
1600 let request = ListIcebergTablesRequest {};
1601 let resp = self.inner.list_iceberg_tables(request).await?;
1602 Ok(resp.iceberg_tables)
1603 }
1604}
1605
1606#[async_trait]
1607impl HummockMetaClient for MetaClient {
1608 async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
1609 let req = UnpinVersionBeforeRequest {
1610 context_id: self.worker_id(),
1611 unpin_version_before: unpin_version_before.to_u64(),
1612 };
1613 self.inner.unpin_version_before(req).await?;
1614 Ok(())
1615 }
1616
1617 async fn get_current_version(&self) -> Result<HummockVersion> {
1618 let req = GetCurrentVersionRequest::default();
1619 Ok(HummockVersion::from_rpc_protobuf(
1620 &self
1621 .inner
1622 .get_current_version(req)
1623 .await?
1624 .current_version
1625 .unwrap(),
1626 ))
1627 }
1628
1629 async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
1630 let resp = self
1631 .inner
1632 .get_new_object_ids(GetNewObjectIdsRequest { number })
1633 .await?;
1634 Ok(ObjectIdRange::new(resp.start_id, resp.end_id))
1635 }
1636
1637 async fn commit_epoch_with_change_log(
1638 &self,
1639 _epoch: HummockEpoch,
1640 _sync_result: SyncResult,
1641 _change_log_info: Option<HummockMetaClientChangeLogInfo>,
1642 ) -> Result<()> {
1643 panic!("Only meta service can commit_epoch in production.")
1644 }
1645
1646 async fn trigger_manual_compaction(
1647 &self,
1648 compaction_group_id: u64,
1649 table_id: u32,
1650 level: u32,
1651 sst_ids: Vec<u64>,
1652 ) -> Result<()> {
1653 let req = TriggerManualCompactionRequest {
1655 compaction_group_id,
1656 table_id,
1657 level,
1660 sst_ids,
1661 ..Default::default()
1662 };
1663
1664 self.inner.trigger_manual_compaction(req).await?;
1665 Ok(())
1666 }
1667
1668 async fn trigger_full_gc(
1669 &self,
1670 sst_retention_time_sec: u64,
1671 prefix: Option<String>,
1672 ) -> Result<()> {
1673 self.inner
1674 .trigger_full_gc(TriggerFullGcRequest {
1675 sst_retention_time_sec,
1676 prefix,
1677 })
1678 .await?;
1679 Ok(())
1680 }
1681
1682 async fn subscribe_compaction_event(
1683 &self,
1684 ) -> Result<(
1685 UnboundedSender<SubscribeCompactionEventRequest>,
1686 BoxStream<'static, CompactionEventItem>,
1687 )> {
1688 let (request_sender, request_receiver) =
1689 unbounded_channel::<SubscribeCompactionEventRequest>();
1690 request_sender
1691 .send(SubscribeCompactionEventRequest {
1692 event: Some(subscribe_compaction_event_request::Event::Register(
1693 Register {
1694 context_id: self.worker_id(),
1695 },
1696 )),
1697 create_at: SystemTime::now()
1698 .duration_since(SystemTime::UNIX_EPOCH)
1699 .expect("Clock may have gone backwards")
1700 .as_millis() as u64,
1701 })
1702 .context("Failed to subscribe compaction event")?;
1703
1704 let stream = self
1705 .inner
1706 .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
1707 request_receiver,
1708 )))
1709 .await?;
1710
1711 Ok((request_sender, Box::pin(stream)))
1712 }
1713
1714 async fn get_version_by_epoch(
1715 &self,
1716 epoch: HummockEpoch,
1717 table_id: u32,
1718 ) -> Result<PbHummockVersion> {
1719 self.get_version_by_epoch(epoch, table_id).await
1720 }
1721
1722 async fn subscribe_iceberg_compaction_event(
1723 &self,
1724 ) -> Result<(
1725 UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1726 BoxStream<'static, IcebergCompactionEventItem>,
1727 )> {
1728 let (request_sender, request_receiver) =
1729 unbounded_channel::<SubscribeIcebergCompactionEventRequest>();
1730 request_sender
1731 .send(SubscribeIcebergCompactionEventRequest {
1732 event: Some(subscribe_iceberg_compaction_event_request::Event::Register(
1733 IcebergRegister {
1734 context_id: self.worker_id(),
1735 },
1736 )),
1737 create_at: SystemTime::now()
1738 .duration_since(SystemTime::UNIX_EPOCH)
1739 .expect("Clock may have gone backwards")
1740 .as_millis() as u64,
1741 })
1742 .context("Failed to subscribe compaction event")?;
1743
1744 let stream = self
1745 .inner
1746 .subscribe_iceberg_compaction_event(Request::new(UnboundedReceiverStream::new(
1747 request_receiver,
1748 )))
1749 .await?;
1750
1751 Ok((request_sender, Box::pin(stream)))
1752 }
1753}
1754
1755#[async_trait]
1756impl TelemetryInfoFetcher for MetaClient {
1757 async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
1758 let resp = self
1759 .get_telemetry_info()
1760 .await
1761 .map_err(|e| e.to_report_string())?;
1762 let tracking_id = resp.get_tracking_id().ok();
1763 Ok(tracking_id.map(|id| id.to_owned()))
1764 }
1765}
1766
1767pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
1768
1769#[derive(Debug, Clone)]
1770struct GrpcMetaClientCore {
1771 cluster_client: ClusterServiceClient<Channel>,
1772 meta_member_client: MetaMemberServiceClient<Channel>,
1773 heartbeat_client: HeartbeatServiceClient<Channel>,
1774 ddl_client: DdlServiceClient<Channel>,
1775 hummock_client: HummockManagerServiceClient<Channel>,
1776 notification_client: NotificationServiceClient<Channel>,
1777 stream_client: StreamManagerServiceClient<Channel>,
1778 user_client: UserServiceClient<Channel>,
1779 scale_client: ScaleServiceClient<Channel>,
1780 backup_client: BackupServiceClient<Channel>,
1781 telemetry_client: TelemetryInfoServiceClient<Channel>,
1782 system_params_client: SystemParamsServiceClient<Channel>,
1783 session_params_client: SessionParamServiceClient<Channel>,
1784 serving_client: ServingServiceClient<Channel>,
1785 cloud_client: CloudServiceClient<Channel>,
1786 sink_coordinate_client: SinkCoordinationRpcClient,
1787 event_log_client: EventLogServiceClient<Channel>,
1788 cluster_limit_client: ClusterLimitServiceClient<Channel>,
1789 hosted_iceberg_catalog_service_client: HostedIcebergCatalogServiceClient<Channel>,
1790}
1791
1792impl GrpcMetaClientCore {
1793 pub(crate) fn new(channel: Channel) -> Self {
1794 let cluster_client = ClusterServiceClient::new(channel.clone());
1795 let meta_member_client = MetaMemberClient::new(channel.clone());
1796 let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
1797 let ddl_client =
1798 DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1799 let hummock_client =
1800 HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1801 let notification_client =
1802 NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1803 let stream_client =
1804 StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1805 let user_client = UserServiceClient::new(channel.clone());
1806 let scale_client =
1807 ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1808 let backup_client = BackupServiceClient::new(channel.clone());
1809 let telemetry_client =
1810 TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1811 let system_params_client = SystemParamsServiceClient::new(channel.clone());
1812 let session_params_client = SessionParamServiceClient::new(channel.clone());
1813 let serving_client = ServingServiceClient::new(channel.clone());
1814 let cloud_client = CloudServiceClient::new(channel.clone());
1815 let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone());
1816 let event_log_client = EventLogServiceClient::new(channel.clone());
1817 let cluster_limit_client = ClusterLimitServiceClient::new(channel.clone());
1818 let hosted_iceberg_catalog_service_client = HostedIcebergCatalogServiceClient::new(channel);
1819
1820 GrpcMetaClientCore {
1821 cluster_client,
1822 meta_member_client,
1823 heartbeat_client,
1824 ddl_client,
1825 hummock_client,
1826 notification_client,
1827 stream_client,
1828 user_client,
1829 scale_client,
1830 backup_client,
1831 telemetry_client,
1832 system_params_client,
1833 session_params_client,
1834 serving_client,
1835 cloud_client,
1836 sink_coordinate_client,
1837 event_log_client,
1838 cluster_limit_client,
1839 hosted_iceberg_catalog_service_client,
1840 }
1841 }
1842}
1843
1844#[derive(Debug, Clone)]
1848struct GrpcMetaClient {
1849 member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
1850 core: Arc<RwLock<GrpcMetaClientCore>>,
1851}
1852
1853type MetaMemberClient = MetaMemberServiceClient<Channel>;
1854
1855struct MetaMemberGroup {
1856 members: LruCache<http::Uri, Option<MetaMemberClient>>,
1857}
1858
1859struct MetaMemberManagement {
1860 core_ref: Arc<RwLock<GrpcMetaClientCore>>,
1861 members: Either<MetaMemberClient, MetaMemberGroup>,
1862 current_leader: http::Uri,
1863 meta_config: MetaConfig,
1864}
1865
1866impl MetaMemberManagement {
1867 const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
1868
1869 fn host_address_to_uri(addr: HostAddress) -> http::Uri {
1870 format!("http://{}:{}", addr.host, addr.port)
1871 .parse()
1872 .unwrap()
1873 }
1874
1875 async fn recreate_core(&self, channel: Channel) {
1876 let mut core = self.core_ref.write().await;
1877 *core = GrpcMetaClientCore::new(channel);
1878 }
1879
1880 async fn refresh_members(&mut self) -> Result<()> {
1881 let leader_addr = match self.members.as_mut() {
1882 Either::Left(client) => {
1883 let resp = client
1884 .to_owned()
1885 .members(MembersRequest {})
1886 .await
1887 .map_err(RpcError::from_meta_status)?;
1888 let resp = resp.into_inner();
1889 resp.members.into_iter().find(|member| member.is_leader)
1890 }
1891 Either::Right(member_group) => {
1892 let mut fetched_members = None;
1893
1894 for (addr, client) in &mut member_group.members {
1895 let members: Result<_> = try {
1896 let mut client = match client {
1897 Some(cached_client) => cached_client.to_owned(),
1898 None => {
1899 let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
1900 let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
1901 .await
1902 .context("failed to create client")?;
1903 let new_client: MetaMemberClient =
1904 MetaMemberServiceClient::new(channel);
1905 *client = Some(new_client.clone());
1906
1907 new_client
1908 }
1909 };
1910
1911 let resp = client
1912 .members(MembersRequest {})
1913 .await
1914 .context("failed to fetch members")?;
1915
1916 resp.into_inner().members
1917 };
1918
1919 let fetched = members.is_ok();
1920 fetched_members = Some(members);
1921 if fetched {
1922 break;
1923 }
1924 }
1925
1926 let members = fetched_members
1927 .context("no member available in the list")?
1928 .context("could not refresh members")?;
1929
1930 let mut leader = None;
1932 for member in members {
1933 if member.is_leader {
1934 leader = Some(member.clone());
1935 }
1936
1937 let addr = Self::host_address_to_uri(member.address.unwrap());
1938 if !member_group.members.contains(&addr) {
1940 tracing::info!("new meta member joined: {}", addr);
1941 member_group.members.put(addr, None);
1942 }
1943 }
1944
1945 leader
1946 }
1947 };
1948
1949 if let Some(leader) = leader_addr {
1950 let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
1951
1952 if discovered_leader != self.current_leader {
1953 tracing::info!("new meta leader {} discovered", discovered_leader);
1954
1955 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
1956 Duration::from_secs(self.meta_config.meta_leader_lease_secs),
1957 false,
1958 );
1959
1960 let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
1961 let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
1962 GrpcMetaClient::connect_to_endpoint(endpoint).await
1963 })
1964 .await?;
1965
1966 self.recreate_core(channel).await;
1967 self.current_leader = discovered_leader;
1968 }
1969 }
1970
1971 Ok(())
1972 }
1973}
1974
1975impl GrpcMetaClient {
1976 const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
1978 const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
1980 const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
1982 const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
1984
1985 fn start_meta_member_monitor(
1986 &self,
1987 init_leader_addr: http::Uri,
1988 members: Either<MetaMemberClient, MetaMemberGroup>,
1989 force_refresh_receiver: Receiver<Sender<Result<()>>>,
1990 meta_config: MetaConfig,
1991 ) -> Result<()> {
1992 let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
1993 let current_leader = init_leader_addr;
1994
1995 let enable_period_tick = matches!(members, Either::Right(_));
1996
1997 let member_management = MetaMemberManagement {
1998 core_ref,
1999 members,
2000 current_leader,
2001 meta_config,
2002 };
2003
2004 let mut force_refresh_receiver = force_refresh_receiver;
2005
2006 tokio::spawn(async move {
2007 let mut member_management = member_management;
2008 let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
2009
2010 loop {
2011 let event: Option<Sender<Result<()>>> = if enable_period_tick {
2012 tokio::select! {
2013 _ = ticker.tick() => None,
2014 result_sender = force_refresh_receiver.recv() => {
2015 if result_sender.is_none() {
2016 break;
2017 }
2018
2019 result_sender
2020 },
2021 }
2022 } else {
2023 let result_sender = force_refresh_receiver.recv().await;
2024
2025 if result_sender.is_none() {
2026 break;
2027 }
2028
2029 result_sender
2030 };
2031
2032 let tick_result = member_management.refresh_members().await;
2033 if let Err(e) = tick_result.as_ref() {
2034 tracing::warn!(error = %e.as_report(), "refresh meta member client failed");
2035 }
2036
2037 if let Some(sender) = event {
2038 let _resp = sender.send(tick_result);
2040 }
2041 }
2042 });
2043
2044 Ok(())
2045 }
2046
2047 async fn force_refresh_leader(&self) -> Result<()> {
2048 let (sender, receiver) = oneshot::channel();
2049
2050 self.member_monitor_event_sender
2051 .send(sender)
2052 .await
2053 .map_err(|e| anyhow!(e))?;
2054
2055 receiver.await.map_err(|e| anyhow!(e))?
2056 }
2057
2058 pub async fn new(strategy: &MetaAddressStrategy, config: MetaConfig) -> Result<Self> {
2060 let (channel, addr) = match strategy {
2061 MetaAddressStrategy::LoadBalance(addr) => {
2062 Self::try_build_rpc_channel(vec![addr.clone()]).await
2063 }
2064 MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
2065 }?;
2066 let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
2067 let client = GrpcMetaClient {
2068 member_monitor_event_sender: force_refresh_sender,
2069 core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
2070 };
2071
2072 let meta_member_client = client.core.read().await.meta_member_client.clone();
2073 let members = match strategy {
2074 MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
2075 MetaAddressStrategy::List(addrs) => {
2076 let mut members = LruCache::new(20);
2077 for addr in addrs {
2078 members.put(addr.clone(), None);
2079 }
2080 members.put(addr.clone(), Some(meta_member_client));
2081
2082 Either::Right(MetaMemberGroup { members })
2083 }
2084 };
2085
2086 client.start_meta_member_monitor(addr, members, force_refresh_receiver, config)?;
2087
2088 client.force_refresh_leader().await?;
2089
2090 Ok(client)
2091 }
2092
2093 fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2094 Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2095 }
2096
2097 pub(crate) async fn try_build_rpc_channel(
2098 addrs: impl IntoIterator<Item = http::Uri>,
2099 ) -> Result<(Channel, http::Uri)> {
2100 let endpoints: Vec<_> = addrs
2101 .into_iter()
2102 .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2103 .collect();
2104
2105 let mut last_error = None;
2106
2107 for (endpoint, addr) in endpoints {
2108 match Self::connect_to_endpoint(endpoint).await {
2109 Ok(channel) => {
2110 tracing::info!("Connect to meta server {} successfully", addr);
2111 return Ok((channel, addr));
2112 }
2113 Err(e) => {
2114 tracing::warn!(
2115 error = %e.as_report(),
2116 "Failed to connect to meta server {}, trying again",
2117 addr,
2118 );
2119 last_error = Some(e);
2120 }
2121 }
2122 }
2123
2124 if let Some(last_error) = last_error {
2125 Err(anyhow::anyhow!(last_error)
2126 .context("failed to connect to all meta servers")
2127 .into())
2128 } else {
2129 bail!("no meta server address provided")
2130 }
2131 }
2132
2133 async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2134 let channel = endpoint
2135 .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2136 .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2137 .connect_timeout(Duration::from_secs(5))
2138 .monitored_connect("grpc-meta-client", Default::default())
2139 .await?
2140 .wrapped();
2141
2142 Ok(channel)
2143 }
2144
2145 pub(crate) fn retry_strategy_to_bound(
2146 high_bound: Duration,
2147 exceed: bool,
2148 ) -> impl Iterator<Item = Duration> {
2149 let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2150 .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2151 .map(jitter);
2152
2153 let mut sum = Duration::default();
2154
2155 iter.take_while(move |duration| {
2156 sum += *duration;
2157
2158 if exceed {
2159 sum < high_bound + *duration
2160 } else {
2161 sum < high_bound
2162 }
2163 })
2164 }
2165}
2166
2167macro_rules! for_all_meta_rpc {
2168 ($macro:ident) => {
2169 $macro! {
2170 { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2171 ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2172 ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2173 ,{ cluster_client, update_worker_node_schedulability, UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse }
2174 ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2175 ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2176 ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2177 ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2178 ,{ stream_client, flush, FlushRequest, FlushResponse }
2179 ,{ stream_client, pause, PauseRequest, PauseResponse }
2180 ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2181 ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2182 ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2183 ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2184 ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2185 ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2186 ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2187 ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2188 ,{ stream_client, list_object_dependencies, ListObjectDependenciesRequest, ListObjectDependenciesResponse }
2189 ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2190 ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2191 ,{ stream_client, alter_connector_props, AlterConnectorPropsRequest, AlterConnectorPropsResponse }
2192 ,{ stream_client, get_fragment_by_id, GetFragmentByIdRequest, GetFragmentByIdResponse }
2193 ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2194 ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2195 ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2196 ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2197 ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2198 ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2199 ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2200 ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2201 ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2202 ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2203 ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2204 ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2205 ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2206 ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2207 ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2208 ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2209 ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2210 ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2211 ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2212 ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2213 ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2214 ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2215 ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2216 ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2217 ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2218 ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2219 ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2220 ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2221 ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2222 ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2223 ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2224 ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2225 ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2226 ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2227 ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2228 ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2229 ,{ ddl_client, wait, WaitRequest, WaitResponse }
2230 ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2231 ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2232 ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2233 ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2234 ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2235 ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2236 ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2237 ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2238 ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2239 ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2240 ,{ hummock_client, get_new_object_ids, GetNewObjectIdsRequest, GetNewObjectIdsResponse }
2241 ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2242 ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2243 ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2244 ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2245 ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2246 ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2247 ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2248 ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2249 ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2250 ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2251 ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2252 ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2253 ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2254 ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2255 ,{ hummock_client, subscribe_iceberg_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeIcebergCompactionEventRequest>, Streaming<SubscribeIcebergCompactionEventResponse> }
2256 ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2257 ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2258 ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2259 ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2260 ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2261 ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2262 ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2263 ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2264 ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2265 ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2266 ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2267 ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2268 ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2269 ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2270 ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2271 ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2272 ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2273 ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2274 ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2275 ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2276 ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2277 ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2278 ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2279 ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2280 ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2281 ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2282 ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2283 ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2284 ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2285 ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2286 ,{ hosted_iceberg_catalog_service_client, list_iceberg_tables, ListIcebergTablesRequest, ListIcebergTablesResponse }
2287 }
2288 };
2289}
2290
2291impl GrpcMetaClient {
2292 async fn refresh_client_if_needed(&self, code: Code) {
2293 if matches!(
2294 code,
2295 Code::Unknown | Code::Unimplemented | Code::Unavailable
2296 ) {
2297 tracing::debug!("matching tonic code {}", code);
2298 let (result_sender, result_receiver) = oneshot::channel();
2299 if self
2300 .member_monitor_event_sender
2301 .try_send(result_sender)
2302 .is_ok()
2303 {
2304 if let Ok(Err(e)) = result_receiver.await {
2305 tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2306 }
2307 } else {
2308 tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2309 }
2310 }
2311 }
2312}
2313
2314impl GrpcMetaClient {
2315 for_all_meta_rpc! { meta_rpc_client_method_impl }
2316}