1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{
33 AlterDatabaseParam, FunctionId, IndexId, ObjectId, SecretId, TableId,
34};
35use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
36use risingwave_common::hash::WorkerSlotMapping;
37use risingwave_common::monitor::EndpointExt;
38use risingwave_common::system_param::reader::SystemParamsReader;
39use risingwave_common::telemetry::report::TelemetryInfoFetcher;
40use risingwave_common::util::addr::HostAddr;
41use risingwave_common::util::meta_addr::MetaAddressStrategy;
42use risingwave_common::util::resource_util::cpu::total_cpu_available;
43use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
44use risingwave_error::bail;
45use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
46use risingwave_hummock_sdk::compaction_group::StateTableId;
47use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
48use risingwave_hummock_sdk::{
49 CompactionGroupId, HummockEpoch, HummockVersionId, ObjectIdRange, SyncResult,
50};
51use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
52use risingwave_pb::backup_service::*;
53use risingwave_pb::catalog::{
54 Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
55 PbSubscription, PbTable, PbView, Table,
56};
57use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
58use risingwave_pb::cloud_service::*;
59use risingwave_pb::common::worker_node::Property;
60use risingwave_pb::common::{HostAddress, OptionalUint32, OptionalUint64, WorkerNode, WorkerType};
61use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
62use risingwave_pb::ddl_service::alter_owner_request::Object;
63use risingwave_pb::ddl_service::create_materialized_view_request::PbBackfillType;
64use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
65use risingwave_pb::ddl_service::drop_table_request::SourceId;
66use risingwave_pb::ddl_service::*;
67use risingwave_pb::hummock::compact_task::TaskStatus;
68use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
69use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
70use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
71use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
72use risingwave_pb::hummock::write_limits::WriteLimit;
73use risingwave_pb::hummock::*;
74use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Register as IcebergRegister;
75use risingwave_pb::iceberg_compaction::{
76 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
77 subscribe_iceberg_compaction_event_request,
78};
79use risingwave_pb::meta::alter_connector_props_request::AlterConnectorPropsObject;
80use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
81use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
82use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
83use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
84use risingwave_pb::meta::hosted_iceberg_catalog_service_client::HostedIcebergCatalogServiceClient;
85use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
86use risingwave_pb::meta::list_actor_states_response::ActorState;
87use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
88use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
89use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
90use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
91use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
92use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
93use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
94use risingwave_pb::meta::serving_service_client::ServingServiceClient;
95use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
96use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
97use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
98use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
99use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
100use risingwave_pb::meta::{FragmentDistribution, *};
101use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
102use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
103use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
104use risingwave_pb::secret::PbSecretRef;
105use risingwave_pb::stream_plan::StreamFragmentGraph;
106use risingwave_pb::user::update_user_request::UpdateField;
107use risingwave_pb::user::user_service_client::UserServiceClient;
108use risingwave_pb::user::*;
109use thiserror_ext::AsReport;
110use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
111use tokio::sync::oneshot::Sender;
112use tokio::sync::{RwLock, mpsc, oneshot};
113use tokio::task::JoinHandle;
114use tokio::time::{self};
115use tokio_retry::strategy::{ExponentialBackoff, jitter};
116use tokio_stream::wrappers::UnboundedReceiverStream;
117use tonic::transport::Endpoint;
118use tonic::{Code, Request, Streaming};
119
120use crate::channel::{Channel, WrappedChannelExt};
121use crate::error::{Result, RpcError};
122use crate::hummock_meta_client::{
123 CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
124 IcebergCompactionEventItem,
125};
126use crate::meta_rpc_client_method_impl;
127
128type ConnectionId = u32;
129type DatabaseId = u32;
130type SchemaId = u32;
131
132#[derive(Clone, Debug)]
134pub struct MetaClient {
135 worker_id: u32,
136 worker_type: WorkerType,
137 host_addr: HostAddr,
138 inner: GrpcMetaClient,
139 meta_config: MetaConfig,
140 cluster_id: String,
141 shutting_down: Arc<AtomicBool>,
142}
143
144impl MetaClient {
145 pub fn worker_id(&self) -> u32 {
146 self.worker_id
147 }
148
149 pub fn host_addr(&self) -> &HostAddr {
150 &self.host_addr
151 }
152
153 pub fn worker_type(&self) -> WorkerType {
154 self.worker_type
155 }
156
157 pub fn cluster_id(&self) -> &str {
158 &self.cluster_id
159 }
160
161 pub async fn subscribe(
163 &self,
164 subscribe_type: SubscribeType,
165 ) -> Result<Streaming<SubscribeResponse>> {
166 let request = SubscribeRequest {
167 subscribe_type: subscribe_type as i32,
168 host: Some(self.host_addr.to_protobuf()),
169 worker_id: self.worker_id(),
170 };
171
172 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
173 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
174 true,
175 );
176
177 tokio_retry::Retry::spawn(retry_strategy, || async {
178 let request = request.clone();
179 self.inner.subscribe(request).await
180 })
181 .await
182 }
183
184 pub async fn create_connection(
185 &self,
186 connection_name: String,
187 database_id: u32,
188 schema_id: u32,
189 owner_id: u32,
190 req: create_connection_request::Payload,
191 ) -> Result<WaitVersion> {
192 let request = CreateConnectionRequest {
193 name: connection_name,
194 database_id,
195 schema_id,
196 owner_id,
197 payload: Some(req),
198 };
199 let resp = self.inner.create_connection(request).await?;
200 Ok(resp
201 .version
202 .ok_or_else(|| anyhow!("wait version not set"))?)
203 }
204
205 pub async fn create_secret(
206 &self,
207 secret_name: String,
208 database_id: u32,
209 schema_id: u32,
210 owner_id: u32,
211 value: Vec<u8>,
212 ) -> Result<WaitVersion> {
213 let request = CreateSecretRequest {
214 name: secret_name,
215 database_id,
216 schema_id,
217 owner_id,
218 value,
219 };
220 let resp = self.inner.create_secret(request).await?;
221 Ok(resp
222 .version
223 .ok_or_else(|| anyhow!("wait version not set"))?)
224 }
225
226 pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
227 let request = ListConnectionsRequest {};
228 let resp = self.inner.list_connections(request).await?;
229 Ok(resp.connections)
230 }
231
232 pub async fn drop_connection(&self, connection_id: ConnectionId) -> Result<WaitVersion> {
233 let request = DropConnectionRequest { connection_id };
234 let resp = self.inner.drop_connection(request).await?;
235 Ok(resp
236 .version
237 .ok_or_else(|| anyhow!("wait version not set"))?)
238 }
239
240 pub async fn drop_secret(&self, secret_id: SecretId) -> Result<WaitVersion> {
241 let request = DropSecretRequest {
242 secret_id: secret_id.into(),
243 };
244 let resp = self.inner.drop_secret(request).await?;
245 Ok(resp
246 .version
247 .ok_or_else(|| anyhow!("wait version not set"))?)
248 }
249
250 pub async fn register_new(
254 addr_strategy: MetaAddressStrategy,
255 worker_type: WorkerType,
256 addr: &HostAddr,
257 property: Property,
258 meta_config: &MetaConfig,
259 ) -> (Self, SystemParamsReader) {
260 let ret =
261 Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
262
263 match ret {
264 Ok(ret) => ret,
265 Err(err) => {
266 tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
267 std::process::exit(1);
268 }
269 }
270 }
271
272 async fn register_new_inner(
273 addr_strategy: MetaAddressStrategy,
274 worker_type: WorkerType,
275 addr: &HostAddr,
276 property: Property,
277 meta_config: &MetaConfig,
278 ) -> Result<(Self, SystemParamsReader)> {
279 tracing::info!("register meta client using strategy: {}", addr_strategy);
280
281 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
283 Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
284 true,
285 );
286
287 if property.is_unschedulable {
288 tracing::warn!("worker {:?} registered as unschedulable", addr.clone());
289 }
290 let init_result: Result<_> = tokio_retry::RetryIf::spawn(
291 retry_strategy,
292 || async {
293 let grpc_meta_client =
294 GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
295
296 let add_worker_resp = grpc_meta_client
297 .add_worker_node(AddWorkerNodeRequest {
298 worker_type: worker_type as i32,
299 host: Some(addr.to_protobuf()),
300 property: Some(property.clone()),
301 resource: Some(risingwave_pb::common::worker_node::Resource {
302 rw_version: RW_VERSION.to_owned(),
303 total_memory_bytes: system_memory_available_bytes() as _,
304 total_cpu_cores: total_cpu_available() as _,
305 }),
306 })
307 .await
308 .context("failed to add worker node")?;
309
310 let system_params_resp = grpc_meta_client
311 .get_system_params(GetSystemParamsRequest {})
312 .await
313 .context("failed to get initial system params")?;
314
315 Ok((add_worker_resp, system_params_resp, grpc_meta_client))
316 },
317 |e: &RpcError| !e.is_from_tonic_server_impl(),
320 )
321 .await;
322
323 let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
324 let worker_id = add_worker_resp
325 .node_id
326 .expect("AddWorkerNodeResponse::node_id is empty");
327
328 let meta_client = Self {
329 worker_id,
330 worker_type,
331 host_addr: addr.clone(),
332 inner: grpc_meta_client,
333 meta_config: meta_config.to_owned(),
334 cluster_id: add_worker_resp.cluster_id,
335 shutting_down: Arc::new(false.into()),
336 };
337
338 static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
339 REPORT_PANIC.call_once(|| {
340 let meta_client_clone = meta_client.clone();
341 std::panic::update_hook(move |default_hook, info| {
342 meta_client_clone.try_add_panic_event_blocking(info, None);
344 default_hook(info);
345 });
346 });
347
348 Ok((meta_client, system_params_resp.params.unwrap().into()))
349 }
350
351 pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
353 let request = ActivateWorkerNodeRequest {
354 host: Some(addr.to_protobuf()),
355 node_id: self.worker_id,
356 };
357 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
358 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
359 true,
360 );
361 tokio_retry::Retry::spawn(retry_strategy, || async {
362 let request = request.clone();
363 self.inner.activate_worker_node(request).await
364 })
365 .await?;
366
367 Ok(())
368 }
369
370 pub async fn send_heartbeat(&self, node_id: u32) -> Result<()> {
372 let request = HeartbeatRequest { node_id };
373 let resp = self.inner.heartbeat(request).await?;
374 if let Some(status) = resp.status {
375 if status.code() == risingwave_pb::common::status::Code::UnknownWorker {
376 if !self.shutting_down.load(Relaxed) {
379 tracing::error!(message = status.message, "worker expired");
380 std::process::exit(1);
381 }
382 }
383 }
384 Ok(())
385 }
386
387 pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
388 let request = CreateDatabaseRequest { db: Some(db) };
389 let resp = self.inner.create_database(request).await?;
390 Ok(resp
392 .version
393 .ok_or_else(|| anyhow!("wait version not set"))?)
394 }
395
396 pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
397 let request = CreateSchemaRequest {
398 schema: Some(schema),
399 };
400 let resp = self.inner.create_schema(request).await?;
401 Ok(resp
403 .version
404 .ok_or_else(|| anyhow!("wait version not set"))?)
405 }
406
407 pub async fn create_materialized_view(
408 &self,
409 table: PbTable,
410 graph: StreamFragmentGraph,
411 dependencies: HashSet<ObjectId>,
412 specific_resource_group: Option<String>,
413 if_not_exists: bool,
414 ) -> Result<WaitVersion> {
415 let request = CreateMaterializedViewRequest {
416 materialized_view: Some(table),
417 fragment_graph: Some(graph),
418 backfill: PbBackfillType::Regular as _,
419 dependencies: dependencies.into_iter().collect(),
420 specific_resource_group,
421 if_not_exists,
422 };
423 let resp = self.inner.create_materialized_view(request).await?;
424 Ok(resp
426 .version
427 .ok_or_else(|| anyhow!("wait version not set"))?)
428 }
429
430 pub async fn drop_materialized_view(
431 &self,
432 table_id: TableId,
433 cascade: bool,
434 ) -> Result<WaitVersion> {
435 let request = DropMaterializedViewRequest {
436 table_id: table_id.table_id(),
437 cascade,
438 };
439
440 let resp = self.inner.drop_materialized_view(request).await?;
441 Ok(resp
442 .version
443 .ok_or_else(|| anyhow!("wait version not set"))?)
444 }
445
446 pub async fn create_source(
447 &self,
448 source: PbSource,
449 graph: Option<StreamFragmentGraph>,
450 if_not_exists: bool,
451 ) -> Result<WaitVersion> {
452 let request = CreateSourceRequest {
453 source: Some(source),
454 fragment_graph: graph,
455 if_not_exists,
456 };
457
458 let resp = self.inner.create_source(request).await?;
459 Ok(resp
460 .version
461 .ok_or_else(|| anyhow!("wait version not set"))?)
462 }
463
464 pub async fn create_sink(
465 &self,
466 sink: PbSink,
467 graph: StreamFragmentGraph,
468 affected_table_change: Option<ReplaceJobPlan>,
469 dependencies: HashSet<ObjectId>,
470 if_not_exists: bool,
471 ) -> Result<WaitVersion> {
472 let request = CreateSinkRequest {
473 sink: Some(sink),
474 fragment_graph: Some(graph),
475 affected_table_change,
476 dependencies: dependencies.into_iter().collect(),
477 if_not_exists,
478 };
479
480 let resp = self.inner.create_sink(request).await?;
481 Ok(resp
482 .version
483 .ok_or_else(|| anyhow!("wait version not set"))?)
484 }
485
486 pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
487 let request = CreateSubscriptionRequest {
488 subscription: Some(subscription),
489 };
490
491 let resp = self.inner.create_subscription(request).await?;
492 Ok(resp
493 .version
494 .ok_or_else(|| anyhow!("wait version not set"))?)
495 }
496
497 pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
498 let request = CreateFunctionRequest {
499 function: Some(function),
500 };
501 let resp = self.inner.create_function(request).await?;
502 Ok(resp
503 .version
504 .ok_or_else(|| anyhow!("wait version not set"))?)
505 }
506
507 pub async fn create_table(
508 &self,
509 source: Option<PbSource>,
510 table: PbTable,
511 graph: StreamFragmentGraph,
512 job_type: PbTableJobType,
513 if_not_exists: bool,
514 ) -> Result<WaitVersion> {
515 let request = CreateTableRequest {
516 materialized_view: Some(table),
517 fragment_graph: Some(graph),
518 source,
519 job_type: job_type as _,
520 if_not_exists,
521 };
522 let resp = self.inner.create_table(request).await?;
523 Ok(resp
525 .version
526 .ok_or_else(|| anyhow!("wait version not set"))?)
527 }
528
529 pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
530 let request = CommentOnRequest {
531 comment: Some(comment),
532 };
533 let resp = self.inner.comment_on(request).await?;
534 Ok(resp
535 .version
536 .ok_or_else(|| anyhow!("wait version not set"))?)
537 }
538
539 pub async fn alter_name(
540 &self,
541 object: alter_name_request::Object,
542 name: &str,
543 ) -> Result<WaitVersion> {
544 let request = AlterNameRequest {
545 object: Some(object),
546 new_name: name.to_owned(),
547 };
548 let resp = self.inner.alter_name(request).await?;
549 Ok(resp
550 .version
551 .ok_or_else(|| anyhow!("wait version not set"))?)
552 }
553
554 pub async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<WaitVersion> {
555 let request = AlterOwnerRequest {
556 object: Some(object),
557 owner_id,
558 };
559 let resp = self.inner.alter_owner(request).await?;
560 Ok(resp
561 .version
562 .ok_or_else(|| anyhow!("wait version not set"))?)
563 }
564
565 pub async fn alter_database_param(
566 &self,
567 database_id: DatabaseId,
568 param: AlterDatabaseParam,
569 ) -> Result<WaitVersion> {
570 let request = match param {
571 AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms) => {
572 let barrier_interval_ms = OptionalUint32 {
573 value: barrier_interval_ms,
574 };
575 AlterDatabaseParamRequest {
576 database_id,
577 param: Some(alter_database_param_request::Param::BarrierIntervalMs(
578 barrier_interval_ms,
579 )),
580 }
581 }
582 AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency) => {
583 let checkpoint_frequency = OptionalUint64 {
584 value: checkpoint_frequency,
585 };
586 AlterDatabaseParamRequest {
587 database_id,
588 param: Some(alter_database_param_request::Param::CheckpointFrequency(
589 checkpoint_frequency,
590 )),
591 }
592 }
593 };
594 let resp = self.inner.alter_database_param(request).await?;
595 Ok(resp
596 .version
597 .ok_or_else(|| anyhow!("wait version not set"))?)
598 }
599
600 pub async fn alter_set_schema(
601 &self,
602 object: alter_set_schema_request::Object,
603 new_schema_id: u32,
604 ) -> Result<WaitVersion> {
605 let request = AlterSetSchemaRequest {
606 new_schema_id,
607 object: Some(object),
608 };
609 let resp = self.inner.alter_set_schema(request).await?;
610 Ok(resp
611 .version
612 .ok_or_else(|| anyhow!("wait version not set"))?)
613 }
614
615 pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
616 let request = AlterSourceRequest {
617 source: Some(source),
618 };
619 let resp = self.inner.alter_source(request).await?;
620 Ok(resp
621 .version
622 .ok_or_else(|| anyhow!("wait version not set"))?)
623 }
624
625 pub async fn alter_parallelism(
626 &self,
627 table_id: u32,
628 parallelism: PbTableParallelism,
629 deferred: bool,
630 ) -> Result<()> {
631 let request = AlterParallelismRequest {
632 table_id,
633 parallelism: Some(parallelism),
634 deferred,
635 };
636
637 self.inner.alter_parallelism(request).await?;
638 Ok(())
639 }
640
641 pub async fn alter_resource_group(
642 &self,
643 table_id: u32,
644 resource_group: Option<String>,
645 deferred: bool,
646 ) -> Result<()> {
647 let request = AlterResourceGroupRequest {
648 table_id,
649 resource_group,
650 deferred,
651 };
652
653 self.inner.alter_resource_group(request).await?;
654 Ok(())
655 }
656
657 pub async fn alter_swap_rename(
658 &self,
659 object: alter_swap_rename_request::Object,
660 ) -> Result<WaitVersion> {
661 let request = AlterSwapRenameRequest {
662 object: Some(object),
663 };
664 let resp = self.inner.alter_swap_rename(request).await?;
665 Ok(resp
666 .version
667 .ok_or_else(|| anyhow!("wait version not set"))?)
668 }
669
670 pub async fn alter_secret(
671 &self,
672 secret_id: u32,
673 secret_name: String,
674 database_id: u32,
675 schema_id: u32,
676 owner_id: u32,
677 value: Vec<u8>,
678 ) -> Result<WaitVersion> {
679 let request = AlterSecretRequest {
680 secret_id,
681 name: secret_name,
682 database_id,
683 schema_id,
684 owner_id,
685 value,
686 };
687 let resp = self.inner.alter_secret(request).await?;
688 Ok(resp
689 .version
690 .ok_or_else(|| anyhow!("wait version not set"))?)
691 }
692
693 pub async fn replace_job(
694 &self,
695 graph: StreamFragmentGraph,
696 replace_job: ReplaceJob,
697 ) -> Result<WaitVersion> {
698 let request = ReplaceJobPlanRequest {
699 plan: Some(ReplaceJobPlan {
700 fragment_graph: Some(graph),
701 replace_job: Some(replace_job),
702 }),
703 };
704 let resp = self.inner.replace_job_plan(request).await?;
705 Ok(resp
707 .version
708 .ok_or_else(|| anyhow!("wait version not set"))?)
709 }
710
711 pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
712 let request = AutoSchemaChangeRequest {
713 schema_change: Some(schema_change),
714 };
715 let _ = self.inner.auto_schema_change(request).await?;
716 Ok(())
717 }
718
719 pub async fn create_view(&self, view: PbView) -> Result<WaitVersion> {
720 let request = CreateViewRequest { view: Some(view) };
721 let resp = self.inner.create_view(request).await?;
722 Ok(resp
724 .version
725 .ok_or_else(|| anyhow!("wait version not set"))?)
726 }
727
728 pub async fn create_index(
729 &self,
730 index: PbIndex,
731 table: PbTable,
732 graph: StreamFragmentGraph,
733 if_not_exists: bool,
734 ) -> Result<WaitVersion> {
735 let request = CreateIndexRequest {
736 index: Some(index),
737 index_table: Some(table),
738 fragment_graph: Some(graph),
739 if_not_exists,
740 };
741 let resp = self.inner.create_index(request).await?;
742 Ok(resp
744 .version
745 .ok_or_else(|| anyhow!("wait version not set"))?)
746 }
747
748 pub async fn drop_table(
749 &self,
750 source_id: Option<u32>,
751 table_id: TableId,
752 cascade: bool,
753 ) -> Result<WaitVersion> {
754 let request = DropTableRequest {
755 source_id: source_id.map(SourceId::Id),
756 table_id: table_id.table_id(),
757 cascade,
758 };
759
760 let resp = self.inner.drop_table(request).await?;
761 Ok(resp
762 .version
763 .ok_or_else(|| anyhow!("wait version not set"))?)
764 }
765
766 pub async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<WaitVersion> {
767 let request = DropViewRequest { view_id, cascade };
768 let resp = self.inner.drop_view(request).await?;
769 Ok(resp
770 .version
771 .ok_or_else(|| anyhow!("wait version not set"))?)
772 }
773
774 pub async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<WaitVersion> {
775 let request = DropSourceRequest { source_id, cascade };
776 let resp = self.inner.drop_source(request).await?;
777 Ok(resp
778 .version
779 .ok_or_else(|| anyhow!("wait version not set"))?)
780 }
781
782 pub async fn drop_sink(
783 &self,
784 sink_id: u32,
785 cascade: bool,
786 affected_table_change: Option<ReplaceJobPlan>,
787 ) -> Result<WaitVersion> {
788 let request = DropSinkRequest {
789 sink_id,
790 cascade,
791 affected_table_change,
792 };
793 let resp = self.inner.drop_sink(request).await?;
794 Ok(resp
795 .version
796 .ok_or_else(|| anyhow!("wait version not set"))?)
797 }
798
799 pub async fn drop_subscription(
800 &self,
801 subscription_id: u32,
802 cascade: bool,
803 ) -> Result<WaitVersion> {
804 let request = DropSubscriptionRequest {
805 subscription_id,
806 cascade,
807 };
808 let resp = self.inner.drop_subscription(request).await?;
809 Ok(resp
810 .version
811 .ok_or_else(|| anyhow!("wait version not set"))?)
812 }
813
814 pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
815 let request = DropIndexRequest {
816 index_id: index_id.index_id,
817 cascade,
818 };
819 let resp = self.inner.drop_index(request).await?;
820 Ok(resp
821 .version
822 .ok_or_else(|| anyhow!("wait version not set"))?)
823 }
824
825 pub async fn drop_function(&self, function_id: FunctionId) -> Result<WaitVersion> {
826 let request = DropFunctionRequest {
827 function_id: function_id.0,
828 };
829 let resp = self.inner.drop_function(request).await?;
830 Ok(resp
831 .version
832 .ok_or_else(|| anyhow!("wait version not set"))?)
833 }
834
835 pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
836 let request = DropDatabaseRequest { database_id };
837 let resp = self.inner.drop_database(request).await?;
838 Ok(resp
839 .version
840 .ok_or_else(|| anyhow!("wait version not set"))?)
841 }
842
843 pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
844 let request = DropSchemaRequest { schema_id, cascade };
845 let resp = self.inner.drop_schema(request).await?;
846 Ok(resp
847 .version
848 .ok_or_else(|| anyhow!("wait version not set"))?)
849 }
850
851 pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
853 let request = CreateUserRequest { user: Some(user) };
854 let resp = self.inner.create_user(request).await?;
855 Ok(resp.version)
856 }
857
858 pub async fn drop_user(&self, user_id: u32) -> Result<u64> {
859 let request = DropUserRequest { user_id };
860 let resp = self.inner.drop_user(request).await?;
861 Ok(resp.version)
862 }
863
864 pub async fn update_user(
865 &self,
866 user: UserInfo,
867 update_fields: Vec<UpdateField>,
868 ) -> Result<u64> {
869 let request = UpdateUserRequest {
870 user: Some(user),
871 update_fields: update_fields
872 .into_iter()
873 .map(|field| field as i32)
874 .collect::<Vec<_>>(),
875 };
876 let resp = self.inner.update_user(request).await?;
877 Ok(resp.version)
878 }
879
880 pub async fn grant_privilege(
881 &self,
882 user_ids: Vec<u32>,
883 privileges: Vec<GrantPrivilege>,
884 with_grant_option: bool,
885 granted_by: u32,
886 ) -> Result<u64> {
887 let request = GrantPrivilegeRequest {
888 user_ids,
889 privileges,
890 with_grant_option,
891 granted_by,
892 };
893 let resp = self.inner.grant_privilege(request).await?;
894 Ok(resp.version)
895 }
896
897 pub async fn revoke_privilege(
898 &self,
899 user_ids: Vec<u32>,
900 privileges: Vec<GrantPrivilege>,
901 granted_by: u32,
902 revoke_by: u32,
903 revoke_grant_option: bool,
904 cascade: bool,
905 ) -> Result<u64> {
906 let request = RevokePrivilegeRequest {
907 user_ids,
908 privileges,
909 granted_by,
910 revoke_by,
911 revoke_grant_option,
912 cascade,
913 };
914 let resp = self.inner.revoke_privilege(request).await?;
915 Ok(resp.version)
916 }
917
918 pub async fn unregister(&self) -> Result<()> {
920 let request = DeleteWorkerNodeRequest {
921 host: Some(self.host_addr.to_protobuf()),
922 };
923 self.inner.delete_worker_node(request).await?;
924 self.shutting_down.store(true, Relaxed);
925 Ok(())
926 }
927
928 pub async fn try_unregister(&self) {
930 match self.unregister().await {
931 Ok(_) => {
932 tracing::info!(
933 worker_id = self.worker_id(),
934 "successfully unregistered from meta service",
935 )
936 }
937 Err(e) => {
938 tracing::warn!(
939 error = %e.as_report(),
940 worker_id = self.worker_id(),
941 "failed to unregister from meta service",
942 );
943 }
944 }
945 }
946
947 pub async fn update_schedulability(
948 &self,
949 worker_ids: &[u32],
950 schedulability: Schedulability,
951 ) -> Result<UpdateWorkerNodeSchedulabilityResponse> {
952 let request = UpdateWorkerNodeSchedulabilityRequest {
953 worker_ids: worker_ids.to_vec(),
954 schedulability: schedulability.into(),
955 };
956 let resp = self
957 .inner
958 .update_worker_node_schedulability(request)
959 .await?;
960 Ok(resp)
961 }
962
963 pub async fn list_worker_nodes(
964 &self,
965 worker_type: Option<WorkerType>,
966 ) -> Result<Vec<WorkerNode>> {
967 let request = ListAllNodesRequest {
968 worker_type: worker_type.map(Into::into),
969 include_starting_nodes: true,
970 };
971 let resp = self.inner.list_all_nodes(request).await?;
972 Ok(resp.nodes)
973 }
974
975 pub fn start_heartbeat_loop(
977 meta_client: MetaClient,
978 min_interval: Duration,
979 ) -> (JoinHandle<()>, Sender<()>) {
980 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
981 let join_handle = tokio::spawn(async move {
982 let mut min_interval_ticker = tokio::time::interval(min_interval);
983 loop {
984 tokio::select! {
985 biased;
986 _ = &mut shutdown_rx => {
988 tracing::info!("Heartbeat loop is stopped");
989 return;
990 }
991 _ = min_interval_ticker.tick() => {},
993 }
994 tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
995 match tokio::time::timeout(
996 min_interval * 3,
998 meta_client.send_heartbeat(meta_client.worker_id()),
999 )
1000 .await
1001 {
1002 Ok(Ok(_)) => {}
1003 Ok(Err(err)) => {
1004 tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
1005 }
1006 Err(_) => {
1007 tracing::warn!("Failed to send_heartbeat: timeout");
1008 }
1009 }
1010 }
1011 });
1012 (join_handle, shutdown_tx)
1013 }
1014
1015 pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
1016 let request = RisectlListStateTablesRequest {};
1017 let resp = self.inner.risectl_list_state_tables(request).await?;
1018 Ok(resp.tables)
1019 }
1020
1021 pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
1022 let request = FlushRequest { database_id };
1023 let resp = self.inner.flush(request).await?;
1024 Ok(HummockVersionId::new(resp.hummock_version_id))
1025 }
1026
1027 pub async fn wait(&self) -> Result<()> {
1028 let request = WaitRequest {};
1029 self.inner.wait(request).await?;
1030 Ok(())
1031 }
1032
1033 pub async fn recover(&self) -> Result<()> {
1034 let request = RecoverRequest {};
1035 self.inner.recover(request).await?;
1036 Ok(())
1037 }
1038
1039 pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
1040 let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
1041 let resp = self.inner.cancel_creating_jobs(request).await?;
1042 Ok(resp.canceled_jobs)
1043 }
1044
1045 pub async fn list_table_fragments(
1046 &self,
1047 table_ids: &[u32],
1048 ) -> Result<HashMap<u32, TableFragmentInfo>> {
1049 let request = ListTableFragmentsRequest {
1050 table_ids: table_ids.to_vec(),
1051 };
1052 let resp = self.inner.list_table_fragments(request).await?;
1053 Ok(resp.table_fragments)
1054 }
1055
1056 pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1057 let resp = self
1058 .inner
1059 .list_streaming_job_states(ListStreamingJobStatesRequest {})
1060 .await?;
1061 Ok(resp.states)
1062 }
1063
1064 pub async fn list_fragment_distributions(&self) -> Result<Vec<FragmentDistribution>> {
1065 let resp = self
1066 .inner
1067 .list_fragment_distribution(ListFragmentDistributionRequest {})
1068 .await?;
1069 Ok(resp.distributions)
1070 }
1071
1072 pub async fn list_creating_stream_scan_fragment_distribution(
1073 &self,
1074 ) -> Result<Vec<FragmentDistribution>> {
1075 let resp = self
1076 .inner
1077 .list_creating_stream_scan_fragment_distribution(
1078 ListCreatingStreamScanFragmentDistributionRequest {},
1079 )
1080 .await?;
1081 Ok(resp.distributions)
1082 }
1083
1084 pub async fn get_fragment_by_id(
1085 &self,
1086 fragment_id: u32,
1087 ) -> Result<Option<FragmentDistribution>> {
1088 let resp = self
1089 .inner
1090 .get_fragment_by_id(GetFragmentByIdRequest { fragment_id })
1091 .await?;
1092 Ok(resp.distribution)
1093 }
1094
1095 pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1096 let resp = self
1097 .inner
1098 .list_actor_states(ListActorStatesRequest {})
1099 .await?;
1100 Ok(resp.states)
1101 }
1102
1103 pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1104 let resp = self
1105 .inner
1106 .list_actor_splits(ListActorSplitsRequest {})
1107 .await?;
1108
1109 Ok(resp.actor_splits)
1110 }
1111
1112 pub async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
1113 let resp = self
1114 .inner
1115 .list_object_dependencies(ListObjectDependenciesRequest {})
1116 .await?;
1117 Ok(resp.dependencies)
1118 }
1119
1120 pub async fn pause(&self) -> Result<PauseResponse> {
1121 let request = PauseRequest {};
1122 let resp = self.inner.pause(request).await?;
1123 Ok(resp)
1124 }
1125
1126 pub async fn resume(&self) -> Result<ResumeResponse> {
1127 let request = ResumeRequest {};
1128 let resp = self.inner.resume(request).await?;
1129 Ok(resp)
1130 }
1131
1132 pub async fn apply_throttle(
1133 &self,
1134 kind: PbThrottleTarget,
1135 id: u32,
1136 rate: Option<u32>,
1137 ) -> Result<ApplyThrottleResponse> {
1138 let request = ApplyThrottleRequest {
1139 kind: kind as i32,
1140 id,
1141 rate,
1142 };
1143 let resp = self.inner.apply_throttle(request).await?;
1144 Ok(resp)
1145 }
1146
1147 pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1148 let resp = self
1149 .inner
1150 .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1151 .await?;
1152 Ok(resp.get_status().unwrap())
1153 }
1154
1155 pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1156 let request = GetClusterInfoRequest {};
1157 let resp = self.inner.get_cluster_info(request).await?;
1158 Ok(resp)
1159 }
1160
1161 pub async fn reschedule(
1162 &self,
1163 worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1164 revision: u64,
1165 resolve_no_shuffle_upstream: bool,
1166 ) -> Result<(bool, u64)> {
1167 let request = RescheduleRequest {
1168 revision,
1169 resolve_no_shuffle_upstream,
1170 worker_reschedules,
1171 };
1172 let resp = self.inner.reschedule(request).await?;
1173 Ok((resp.success, resp.revision))
1174 }
1175
1176 pub async fn risectl_get_pinned_versions_summary(
1177 &self,
1178 ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1179 let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1180 self.inner
1181 .rise_ctl_get_pinned_versions_summary(request)
1182 .await
1183 }
1184
1185 pub async fn risectl_get_checkpoint_hummock_version(
1186 &self,
1187 ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1188 let request = RiseCtlGetCheckpointVersionRequest {};
1189 self.inner.rise_ctl_get_checkpoint_version(request).await
1190 }
1191
1192 pub async fn risectl_pause_hummock_version_checkpoint(
1193 &self,
1194 ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1195 let request = RiseCtlPauseVersionCheckpointRequest {};
1196 self.inner.rise_ctl_pause_version_checkpoint(request).await
1197 }
1198
1199 pub async fn risectl_resume_hummock_version_checkpoint(
1200 &self,
1201 ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1202 let request = RiseCtlResumeVersionCheckpointRequest {};
1203 self.inner.rise_ctl_resume_version_checkpoint(request).await
1204 }
1205
1206 pub async fn init_metadata_for_replay(
1207 &self,
1208 tables: Vec<PbTable>,
1209 compaction_groups: Vec<CompactionGroupInfo>,
1210 ) -> Result<()> {
1211 let req = InitMetadataForReplayRequest {
1212 tables,
1213 compaction_groups,
1214 };
1215 let _resp = self.inner.init_metadata_for_replay(req).await?;
1216 Ok(())
1217 }
1218
1219 pub async fn replay_version_delta(
1220 &self,
1221 version_delta: HummockVersionDelta,
1222 ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1223 let req = ReplayVersionDeltaRequest {
1224 version_delta: Some(version_delta.into()),
1225 };
1226 let resp = self.inner.replay_version_delta(req).await?;
1227 Ok((
1228 HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1229 resp.modified_compaction_groups,
1230 ))
1231 }
1232
1233 pub async fn list_version_deltas(
1234 &self,
1235 start_id: HummockVersionId,
1236 num_limit: u32,
1237 committed_epoch_limit: HummockEpoch,
1238 ) -> Result<Vec<HummockVersionDelta>> {
1239 let req = ListVersionDeltasRequest {
1240 start_id: start_id.to_u64(),
1241 num_limit,
1242 committed_epoch_limit,
1243 };
1244 Ok(self
1245 .inner
1246 .list_version_deltas(req)
1247 .await?
1248 .version_deltas
1249 .unwrap()
1250 .version_deltas
1251 .iter()
1252 .map(HummockVersionDelta::from_rpc_protobuf)
1253 .collect())
1254 }
1255
1256 pub async fn trigger_compaction_deterministic(
1257 &self,
1258 version_id: HummockVersionId,
1259 compaction_groups: Vec<CompactionGroupId>,
1260 ) -> Result<()> {
1261 let req = TriggerCompactionDeterministicRequest {
1262 version_id: version_id.to_u64(),
1263 compaction_groups,
1264 };
1265 self.inner.trigger_compaction_deterministic(req).await?;
1266 Ok(())
1267 }
1268
1269 pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1270 let req = DisableCommitEpochRequest {};
1271 Ok(HummockVersion::from_rpc_protobuf(
1272 &self
1273 .inner
1274 .disable_commit_epoch(req)
1275 .await?
1276 .current_version
1277 .unwrap(),
1278 ))
1279 }
1280
1281 pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1282 let req = GetAssignedCompactTaskNumRequest {};
1283 let resp = self.inner.get_assigned_compact_task_num(req).await?;
1284 Ok(resp.num_tasks as usize)
1285 }
1286
1287 pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1288 let req = RiseCtlListCompactionGroupRequest {};
1289 let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1290 Ok(resp.compaction_groups)
1291 }
1292
1293 pub async fn risectl_update_compaction_config(
1294 &self,
1295 compaction_groups: &[CompactionGroupId],
1296 configs: &[MutableConfig],
1297 ) -> Result<()> {
1298 let req = RiseCtlUpdateCompactionConfigRequest {
1299 compaction_group_ids: compaction_groups.to_vec(),
1300 configs: configs
1301 .iter()
1302 .map(
1303 |c| rise_ctl_update_compaction_config_request::MutableConfig {
1304 mutable_config: Some(c.clone()),
1305 },
1306 )
1307 .collect(),
1308 };
1309 let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1310 Ok(())
1311 }
1312
1313 pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1314 let req = BackupMetaRequest { remarks };
1315 let resp = self.inner.backup_meta(req).await?;
1316 Ok(resp.job_id)
1317 }
1318
1319 pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1320 let req = GetBackupJobStatusRequest { job_id };
1321 let resp = self.inner.get_backup_job_status(req).await?;
1322 Ok((resp.job_status(), resp.message))
1323 }
1324
1325 pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1326 let req = DeleteMetaSnapshotRequest {
1327 snapshot_ids: snapshot_ids.to_vec(),
1328 };
1329 let _resp = self.inner.delete_meta_snapshot(req).await?;
1330 Ok(())
1331 }
1332
1333 pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1334 let req = GetMetaSnapshotManifestRequest {};
1335 let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1336 Ok(resp.manifest.expect("should exist"))
1337 }
1338
1339 pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1340 let req = GetTelemetryInfoRequest {};
1341 let resp = self.inner.get_telemetry_info(req).await?;
1342 Ok(resp)
1343 }
1344
1345 pub async fn get_system_params(&self) -> Result<SystemParamsReader> {
1346 let req = GetSystemParamsRequest {};
1347 let resp = self.inner.get_system_params(req).await?;
1348 Ok(resp.params.unwrap().into())
1349 }
1350
1351 pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1352 let req = GetMetaStoreInfoRequest {};
1353 let resp = self.inner.get_meta_store_info(req).await?;
1354 Ok(resp.meta_store_endpoint)
1355 }
1356
1357 pub async fn alter_sink_props(
1358 &self,
1359 sink_id: u32,
1360 changed_props: BTreeMap<String, String>,
1361 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1362 connector_conn_ref: Option<u32>,
1363 ) -> Result<()> {
1364 let req = AlterConnectorPropsRequest {
1365 object_id: sink_id,
1366 changed_props: changed_props.into_iter().collect(),
1367 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1368 connector_conn_ref,
1369 object_type: AlterConnectorPropsObject::Sink as i32,
1370 };
1371 let _resp = self.inner.alter_connector_props(req).await?;
1372 Ok(())
1373 }
1374
1375 pub async fn set_system_param(
1376 &self,
1377 param: String,
1378 value: Option<String>,
1379 ) -> Result<Option<SystemParamsReader>> {
1380 let req = SetSystemParamRequest { param, value };
1381 let resp = self.inner.set_system_param(req).await?;
1382 Ok(resp.params.map(SystemParamsReader::from))
1383 }
1384
1385 pub async fn get_session_params(&self) -> Result<String> {
1386 let req = GetSessionParamsRequest {};
1387 let resp = self.inner.get_session_params(req).await?;
1388 Ok(resp.params)
1389 }
1390
1391 pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1392 let req = SetSessionParamRequest { param, value };
1393 let resp = self.inner.set_session_param(req).await?;
1394 Ok(resp.param)
1395 }
1396
1397 pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1398 let req = GetDdlProgressRequest {};
1399 let resp = self.inner.get_ddl_progress(req).await?;
1400 Ok(resp.ddl_progress)
1401 }
1402
1403 pub async fn split_compaction_group(
1404 &self,
1405 group_id: CompactionGroupId,
1406 table_ids_to_new_group: &[StateTableId],
1407 partition_vnode_count: u32,
1408 ) -> Result<CompactionGroupId> {
1409 let req = SplitCompactionGroupRequest {
1410 group_id,
1411 table_ids: table_ids_to_new_group.to_vec(),
1412 partition_vnode_count,
1413 };
1414 let resp = self.inner.split_compaction_group(req).await?;
1415 Ok(resp.new_group_id)
1416 }
1417
1418 pub async fn get_tables(
1419 &self,
1420 table_ids: &[u32],
1421 include_dropped_tables: bool,
1422 ) -> Result<HashMap<u32, Table>> {
1423 let req = GetTablesRequest {
1424 table_ids: table_ids.to_vec(),
1425 include_dropped_tables,
1426 };
1427 let resp = self.inner.get_tables(req).await?;
1428 Ok(resp.tables)
1429 }
1430
1431 pub async fn list_serving_vnode_mappings(
1432 &self,
1433 ) -> Result<HashMap<u32, (u32, WorkerSlotMapping)>> {
1434 let req = GetServingVnodeMappingsRequest {};
1435 let resp = self.inner.get_serving_vnode_mappings(req).await?;
1436 let mappings = resp
1437 .worker_slot_mappings
1438 .into_iter()
1439 .map(|p| {
1440 (
1441 p.fragment_id,
1442 (
1443 resp.fragment_to_table
1444 .get(&p.fragment_id)
1445 .cloned()
1446 .unwrap_or(0),
1447 WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1448 ),
1449 )
1450 })
1451 .collect();
1452 Ok(mappings)
1453 }
1454
1455 pub async fn risectl_list_compaction_status(
1456 &self,
1457 ) -> Result<(
1458 Vec<CompactStatus>,
1459 Vec<CompactTaskAssignment>,
1460 Vec<CompactTaskProgress>,
1461 )> {
1462 let req = RiseCtlListCompactionStatusRequest {};
1463 let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1464 Ok((
1465 resp.compaction_statuses,
1466 resp.task_assignment,
1467 resp.task_progress,
1468 ))
1469 }
1470
1471 pub async fn get_compaction_score(
1472 &self,
1473 compaction_group_id: CompactionGroupId,
1474 ) -> Result<Vec<PickerInfo>> {
1475 let req = GetCompactionScoreRequest {
1476 compaction_group_id,
1477 };
1478 let resp = self.inner.get_compaction_score(req).await?;
1479 Ok(resp.scores)
1480 }
1481
1482 pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1483 let req = RiseCtlRebuildTableStatsRequest {};
1484 let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1485 Ok(())
1486 }
1487
1488 pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1489 let req = ListBranchedObjectRequest {};
1490 let resp = self.inner.list_branched_object(req).await?;
1491 Ok(resp.branched_objects)
1492 }
1493
1494 pub async fn list_active_write_limit(&self) -> Result<HashMap<u64, WriteLimit>> {
1495 let req = ListActiveWriteLimitRequest {};
1496 let resp = self.inner.list_active_write_limit(req).await?;
1497 Ok(resp.write_limits)
1498 }
1499
1500 pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1501 let req = ListHummockMetaConfigRequest {};
1502 let resp = self.inner.list_hummock_meta_config(req).await?;
1503 Ok(resp.configs)
1504 }
1505
1506 pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1507 let _resp = self
1508 .inner
1509 .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1510 .await?;
1511
1512 Ok(())
1513 }
1514
1515 pub async fn rw_cloud_validate_source(
1516 &self,
1517 source_type: SourceType,
1518 source_config: HashMap<String, String>,
1519 ) -> Result<RwCloudValidateSourceResponse> {
1520 let req = RwCloudValidateSourceRequest {
1521 source_type: source_type.into(),
1522 source_config,
1523 };
1524 let resp = self.inner.rw_cloud_validate_source(req).await?;
1525 Ok(resp)
1526 }
1527
1528 pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1529 self.inner.core.read().await.sink_coordinate_client.clone()
1530 }
1531
1532 pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1533 let req = ListCompactTaskAssignmentRequest {};
1534 let resp = self.inner.list_compact_task_assignment(req).await?;
1535 Ok(resp.task_assignment)
1536 }
1537
1538 pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1539 let req = ListEventLogRequest::default();
1540 let resp = self.inner.list_event_log(req).await?;
1541 Ok(resp.event_logs)
1542 }
1543
1544 pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1545 let req = ListCompactTaskProgressRequest {};
1546 let resp = self.inner.list_compact_task_progress(req).await?;
1547 Ok(resp.task_progress)
1548 }
1549
1550 #[cfg(madsim)]
1551 pub fn try_add_panic_event_blocking(
1552 &self,
1553 panic_info: impl Display,
1554 timeout_millis: Option<u64>,
1555 ) {
1556 }
1557
1558 #[cfg(not(madsim))]
1560 pub fn try_add_panic_event_blocking(
1561 &self,
1562 panic_info: impl Display,
1563 timeout_millis: Option<u64>,
1564 ) {
1565 let event = event_log::EventWorkerNodePanic {
1566 worker_id: self.worker_id,
1567 worker_type: self.worker_type.into(),
1568 host_addr: Some(self.host_addr.to_protobuf()),
1569 panic_info: format!("{panic_info}"),
1570 };
1571 let grpc_meta_client = self.inner.clone();
1572 let _ = thread::spawn(move || {
1573 let rt = tokio::runtime::Builder::new_current_thread()
1574 .enable_all()
1575 .build()
1576 .unwrap();
1577 let req = AddEventLogRequest {
1578 event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1579 };
1580 rt.block_on(async {
1581 let _ = tokio::time::timeout(
1582 Duration::from_millis(timeout_millis.unwrap_or(1000)),
1583 grpc_meta_client.add_event_log(req),
1584 )
1585 .await;
1586 });
1587 })
1588 .join();
1589 }
1590
1591 pub async fn add_sink_fail_evet(
1592 &self,
1593 sink_id: u32,
1594 sink_name: String,
1595 connector: String,
1596 error: String,
1597 ) -> Result<()> {
1598 let event = event_log::EventSinkFail {
1599 sink_id,
1600 sink_name,
1601 connector,
1602 error,
1603 };
1604 let req = AddEventLogRequest {
1605 event: Some(add_event_log_request::Event::SinkFail(event)),
1606 };
1607 self.inner.add_event_log(req).await?;
1608 Ok(())
1609 }
1610
1611 pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1612 let req = CancelCompactTaskRequest {
1613 task_id,
1614 task_status: task_status as _,
1615 };
1616 let resp = self.inner.cancel_compact_task(req).await?;
1617 Ok(resp.ret)
1618 }
1619
1620 pub async fn get_version_by_epoch(
1621 &self,
1622 epoch: HummockEpoch,
1623 table_id: u32,
1624 ) -> Result<PbHummockVersion> {
1625 let req = GetVersionByEpochRequest { epoch, table_id };
1626 let resp = self.inner.get_version_by_epoch(req).await?;
1627 Ok(resp.version.unwrap())
1628 }
1629
1630 pub async fn get_cluster_limits(
1631 &self,
1632 ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1633 let req = GetClusterLimitsRequest {};
1634 let resp = self.inner.get_cluster_limits(req).await?;
1635 Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1636 }
1637
1638 pub async fn merge_compaction_group(
1639 &self,
1640 left_group_id: CompactionGroupId,
1641 right_group_id: CompactionGroupId,
1642 ) -> Result<()> {
1643 let req = MergeCompactionGroupRequest {
1644 left_group_id,
1645 right_group_id,
1646 };
1647 self.inner.merge_compaction_group(req).await?;
1648 Ok(())
1649 }
1650
1651 pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1653 let request = ListRateLimitsRequest {};
1654 let resp = self.inner.list_rate_limits(request).await?;
1655 Ok(resp.rate_limits)
1656 }
1657
1658 pub async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
1659 let request = ListIcebergTablesRequest {};
1660 let resp = self.inner.list_iceberg_tables(request).await?;
1661 Ok(resp.iceberg_tables)
1662 }
1663
1664 pub async fn get_cluster_stack_trace(
1666 &self,
1667 actor_traces_format: ActorTracesFormat,
1668 ) -> Result<StackTraceResponse> {
1669 let request = StackTraceRequest {
1670 actor_traces_format: actor_traces_format as i32,
1671 };
1672 let resp = self.inner.stack_trace(request).await?;
1673 Ok(resp)
1674 }
1675}
1676
1677#[async_trait]
1678impl HummockMetaClient for MetaClient {
1679 async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
1680 let req = UnpinVersionBeforeRequest {
1681 context_id: self.worker_id(),
1682 unpin_version_before: unpin_version_before.to_u64(),
1683 };
1684 self.inner.unpin_version_before(req).await?;
1685 Ok(())
1686 }
1687
1688 async fn get_current_version(&self) -> Result<HummockVersion> {
1689 let req = GetCurrentVersionRequest::default();
1690 Ok(HummockVersion::from_rpc_protobuf(
1691 &self
1692 .inner
1693 .get_current_version(req)
1694 .await?
1695 .current_version
1696 .unwrap(),
1697 ))
1698 }
1699
1700 async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
1701 let resp = self
1702 .inner
1703 .get_new_object_ids(GetNewObjectIdsRequest { number })
1704 .await?;
1705 Ok(ObjectIdRange::new(resp.start_id, resp.end_id))
1706 }
1707
1708 async fn commit_epoch_with_change_log(
1709 &self,
1710 _epoch: HummockEpoch,
1711 _sync_result: SyncResult,
1712 _change_log_info: Option<HummockMetaClientChangeLogInfo>,
1713 ) -> Result<()> {
1714 panic!("Only meta service can commit_epoch in production.")
1715 }
1716
1717 async fn trigger_manual_compaction(
1718 &self,
1719 compaction_group_id: u64,
1720 table_id: u32,
1721 level: u32,
1722 sst_ids: Vec<u64>,
1723 ) -> Result<()> {
1724 let req = TriggerManualCompactionRequest {
1726 compaction_group_id,
1727 table_id,
1728 level,
1731 sst_ids,
1732 ..Default::default()
1733 };
1734
1735 self.inner.trigger_manual_compaction(req).await?;
1736 Ok(())
1737 }
1738
1739 async fn trigger_full_gc(
1740 &self,
1741 sst_retention_time_sec: u64,
1742 prefix: Option<String>,
1743 ) -> Result<()> {
1744 self.inner
1745 .trigger_full_gc(TriggerFullGcRequest {
1746 sst_retention_time_sec,
1747 prefix,
1748 })
1749 .await?;
1750 Ok(())
1751 }
1752
1753 async fn subscribe_compaction_event(
1754 &self,
1755 ) -> Result<(
1756 UnboundedSender<SubscribeCompactionEventRequest>,
1757 BoxStream<'static, CompactionEventItem>,
1758 )> {
1759 let (request_sender, request_receiver) =
1760 unbounded_channel::<SubscribeCompactionEventRequest>();
1761 request_sender
1762 .send(SubscribeCompactionEventRequest {
1763 event: Some(subscribe_compaction_event_request::Event::Register(
1764 Register {
1765 context_id: self.worker_id(),
1766 },
1767 )),
1768 create_at: SystemTime::now()
1769 .duration_since(SystemTime::UNIX_EPOCH)
1770 .expect("Clock may have gone backwards")
1771 .as_millis() as u64,
1772 })
1773 .context("Failed to subscribe compaction event")?;
1774
1775 let stream = self
1776 .inner
1777 .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
1778 request_receiver,
1779 )))
1780 .await?;
1781
1782 Ok((request_sender, Box::pin(stream)))
1783 }
1784
1785 async fn get_version_by_epoch(
1786 &self,
1787 epoch: HummockEpoch,
1788 table_id: u32,
1789 ) -> Result<PbHummockVersion> {
1790 self.get_version_by_epoch(epoch, table_id).await
1791 }
1792
1793 async fn subscribe_iceberg_compaction_event(
1794 &self,
1795 ) -> Result<(
1796 UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1797 BoxStream<'static, IcebergCompactionEventItem>,
1798 )> {
1799 let (request_sender, request_receiver) =
1800 unbounded_channel::<SubscribeIcebergCompactionEventRequest>();
1801 request_sender
1802 .send(SubscribeIcebergCompactionEventRequest {
1803 event: Some(subscribe_iceberg_compaction_event_request::Event::Register(
1804 IcebergRegister {
1805 context_id: self.worker_id(),
1806 },
1807 )),
1808 create_at: SystemTime::now()
1809 .duration_since(SystemTime::UNIX_EPOCH)
1810 .expect("Clock may have gone backwards")
1811 .as_millis() as u64,
1812 })
1813 .context("Failed to subscribe compaction event")?;
1814
1815 let stream = self
1816 .inner
1817 .subscribe_iceberg_compaction_event(Request::new(UnboundedReceiverStream::new(
1818 request_receiver,
1819 )))
1820 .await?;
1821
1822 Ok((request_sender, Box::pin(stream)))
1823 }
1824}
1825
1826#[async_trait]
1827impl TelemetryInfoFetcher for MetaClient {
1828 async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
1829 let resp = self
1830 .get_telemetry_info()
1831 .await
1832 .map_err(|e| e.to_report_string())?;
1833 let tracking_id = resp.get_tracking_id().ok();
1834 Ok(tracking_id.map(|id| id.to_owned()))
1835 }
1836}
1837
1838pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
1839
1840#[derive(Debug, Clone)]
1841struct GrpcMetaClientCore {
1842 cluster_client: ClusterServiceClient<Channel>,
1843 meta_member_client: MetaMemberServiceClient<Channel>,
1844 heartbeat_client: HeartbeatServiceClient<Channel>,
1845 ddl_client: DdlServiceClient<Channel>,
1846 hummock_client: HummockManagerServiceClient<Channel>,
1847 notification_client: NotificationServiceClient<Channel>,
1848 stream_client: StreamManagerServiceClient<Channel>,
1849 user_client: UserServiceClient<Channel>,
1850 scale_client: ScaleServiceClient<Channel>,
1851 backup_client: BackupServiceClient<Channel>,
1852 telemetry_client: TelemetryInfoServiceClient<Channel>,
1853 system_params_client: SystemParamsServiceClient<Channel>,
1854 session_params_client: SessionParamServiceClient<Channel>,
1855 serving_client: ServingServiceClient<Channel>,
1856 cloud_client: CloudServiceClient<Channel>,
1857 sink_coordinate_client: SinkCoordinationRpcClient,
1858 event_log_client: EventLogServiceClient<Channel>,
1859 cluster_limit_client: ClusterLimitServiceClient<Channel>,
1860 hosted_iceberg_catalog_service_client: HostedIcebergCatalogServiceClient<Channel>,
1861 monitor_client: MonitorServiceClient<Channel>,
1862}
1863
1864impl GrpcMetaClientCore {
1865 pub(crate) fn new(channel: Channel) -> Self {
1866 let cluster_client = ClusterServiceClient::new(channel.clone());
1867 let meta_member_client = MetaMemberClient::new(channel.clone());
1868 let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
1869 let ddl_client =
1870 DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1871 let hummock_client =
1872 HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1873 let notification_client =
1874 NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1875 let stream_client =
1876 StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1877 let user_client = UserServiceClient::new(channel.clone());
1878 let scale_client =
1879 ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1880 let backup_client = BackupServiceClient::new(channel.clone());
1881 let telemetry_client =
1882 TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1883 let system_params_client = SystemParamsServiceClient::new(channel.clone());
1884 let session_params_client = SessionParamServiceClient::new(channel.clone());
1885 let serving_client = ServingServiceClient::new(channel.clone());
1886 let cloud_client = CloudServiceClient::new(channel.clone());
1887 let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone());
1888 let event_log_client = EventLogServiceClient::new(channel.clone());
1889 let cluster_limit_client = ClusterLimitServiceClient::new(channel.clone());
1890 let hosted_iceberg_catalog_service_client =
1891 HostedIcebergCatalogServiceClient::new(channel.clone());
1892 let monitor_client = MonitorServiceClient::new(channel);
1893
1894 GrpcMetaClientCore {
1895 cluster_client,
1896 meta_member_client,
1897 heartbeat_client,
1898 ddl_client,
1899 hummock_client,
1900 notification_client,
1901 stream_client,
1902 user_client,
1903 scale_client,
1904 backup_client,
1905 telemetry_client,
1906 system_params_client,
1907 session_params_client,
1908 serving_client,
1909 cloud_client,
1910 sink_coordinate_client,
1911 event_log_client,
1912 cluster_limit_client,
1913 hosted_iceberg_catalog_service_client,
1914 monitor_client,
1915 }
1916 }
1917}
1918
1919#[derive(Debug, Clone)]
1923struct GrpcMetaClient {
1924 member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
1925 core: Arc<RwLock<GrpcMetaClientCore>>,
1926}
1927
1928type MetaMemberClient = MetaMemberServiceClient<Channel>;
1929
1930struct MetaMemberGroup {
1931 members: LruCache<http::Uri, Option<MetaMemberClient>>,
1932}
1933
1934struct MetaMemberManagement {
1935 core_ref: Arc<RwLock<GrpcMetaClientCore>>,
1936 members: Either<MetaMemberClient, MetaMemberGroup>,
1937 current_leader: http::Uri,
1938 meta_config: MetaConfig,
1939}
1940
1941impl MetaMemberManagement {
1942 const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
1943
1944 fn host_address_to_uri(addr: HostAddress) -> http::Uri {
1945 format!("http://{}:{}", addr.host, addr.port)
1946 .parse()
1947 .unwrap()
1948 }
1949
1950 async fn recreate_core(&self, channel: Channel) {
1951 let mut core = self.core_ref.write().await;
1952 *core = GrpcMetaClientCore::new(channel);
1953 }
1954
1955 async fn refresh_members(&mut self) -> Result<()> {
1956 let leader_addr = match self.members.as_mut() {
1957 Either::Left(client) => {
1958 let resp = client
1959 .to_owned()
1960 .members(MembersRequest {})
1961 .await
1962 .map_err(RpcError::from_meta_status)?;
1963 let resp = resp.into_inner();
1964 resp.members.into_iter().find(|member| member.is_leader)
1965 }
1966 Either::Right(member_group) => {
1967 let mut fetched_members = None;
1968
1969 for (addr, client) in &mut member_group.members {
1970 let members: Result<_> = try {
1971 let mut client = match client {
1972 Some(cached_client) => cached_client.to_owned(),
1973 None => {
1974 let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
1975 let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
1976 .await
1977 .context("failed to create client")?;
1978 let new_client: MetaMemberClient =
1979 MetaMemberServiceClient::new(channel);
1980 *client = Some(new_client.clone());
1981
1982 new_client
1983 }
1984 };
1985
1986 let resp = client
1987 .members(MembersRequest {})
1988 .await
1989 .context("failed to fetch members")?;
1990
1991 resp.into_inner().members
1992 };
1993
1994 let fetched = members.is_ok();
1995 fetched_members = Some(members);
1996 if fetched {
1997 break;
1998 }
1999 }
2000
2001 let members = fetched_members
2002 .context("no member available in the list")?
2003 .context("could not refresh members")?;
2004
2005 let mut leader = None;
2007 for member in members {
2008 if member.is_leader {
2009 leader = Some(member.clone());
2010 }
2011
2012 let addr = Self::host_address_to_uri(member.address.unwrap());
2013 if !member_group.members.contains(&addr) {
2015 tracing::info!("new meta member joined: {}", addr);
2016 member_group.members.put(addr, None);
2017 }
2018 }
2019
2020 leader
2021 }
2022 };
2023
2024 if let Some(leader) = leader_addr {
2025 let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
2026
2027 if discovered_leader != self.current_leader {
2028 tracing::info!("new meta leader {} discovered", discovered_leader);
2029
2030 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
2031 Duration::from_secs(self.meta_config.meta_leader_lease_secs),
2032 false,
2033 );
2034
2035 let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
2036 let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
2037 GrpcMetaClient::connect_to_endpoint(endpoint).await
2038 })
2039 .await?;
2040
2041 self.recreate_core(channel).await;
2042 self.current_leader = discovered_leader;
2043 }
2044 }
2045
2046 Ok(())
2047 }
2048}
2049
2050impl GrpcMetaClient {
2051 const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
2053 const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
2055 const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
2057 const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
2059
2060 fn start_meta_member_monitor(
2061 &self,
2062 init_leader_addr: http::Uri,
2063 members: Either<MetaMemberClient, MetaMemberGroup>,
2064 force_refresh_receiver: Receiver<Sender<Result<()>>>,
2065 meta_config: MetaConfig,
2066 ) -> Result<()> {
2067 let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
2068 let current_leader = init_leader_addr;
2069
2070 let enable_period_tick = matches!(members, Either::Right(_));
2071
2072 let member_management = MetaMemberManagement {
2073 core_ref,
2074 members,
2075 current_leader,
2076 meta_config,
2077 };
2078
2079 let mut force_refresh_receiver = force_refresh_receiver;
2080
2081 tokio::spawn(async move {
2082 let mut member_management = member_management;
2083 let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
2084
2085 loop {
2086 let event: Option<Sender<Result<()>>> = if enable_period_tick {
2087 tokio::select! {
2088 _ = ticker.tick() => None,
2089 result_sender = force_refresh_receiver.recv() => {
2090 if result_sender.is_none() {
2091 break;
2092 }
2093
2094 result_sender
2095 },
2096 }
2097 } else {
2098 let result_sender = force_refresh_receiver.recv().await;
2099
2100 if result_sender.is_none() {
2101 break;
2102 }
2103
2104 result_sender
2105 };
2106
2107 let tick_result = member_management.refresh_members().await;
2108 if let Err(e) = tick_result.as_ref() {
2109 tracing::warn!(error = %e.as_report(), "refresh meta member client failed");
2110 }
2111
2112 if let Some(sender) = event {
2113 let _resp = sender.send(tick_result);
2115 }
2116 }
2117 });
2118
2119 Ok(())
2120 }
2121
2122 async fn force_refresh_leader(&self) -> Result<()> {
2123 let (sender, receiver) = oneshot::channel();
2124
2125 self.member_monitor_event_sender
2126 .send(sender)
2127 .await
2128 .map_err(|e| anyhow!(e))?;
2129
2130 receiver.await.map_err(|e| anyhow!(e))?
2131 }
2132
2133 pub async fn new(strategy: &MetaAddressStrategy, config: MetaConfig) -> Result<Self> {
2135 let (channel, addr) = match strategy {
2136 MetaAddressStrategy::LoadBalance(addr) => {
2137 Self::try_build_rpc_channel(vec![addr.clone()]).await
2138 }
2139 MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
2140 }?;
2141 let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
2142 let client = GrpcMetaClient {
2143 member_monitor_event_sender: force_refresh_sender,
2144 core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
2145 };
2146
2147 let meta_member_client = client.core.read().await.meta_member_client.clone();
2148 let members = match strategy {
2149 MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
2150 MetaAddressStrategy::List(addrs) => {
2151 let mut members = LruCache::new(20);
2152 for addr in addrs {
2153 members.put(addr.clone(), None);
2154 }
2155 members.put(addr.clone(), Some(meta_member_client));
2156
2157 Either::Right(MetaMemberGroup { members })
2158 }
2159 };
2160
2161 client.start_meta_member_monitor(addr, members, force_refresh_receiver, config)?;
2162
2163 client.force_refresh_leader().await?;
2164
2165 Ok(client)
2166 }
2167
2168 fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2169 Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2170 }
2171
2172 pub(crate) async fn try_build_rpc_channel(
2173 addrs: impl IntoIterator<Item = http::Uri>,
2174 ) -> Result<(Channel, http::Uri)> {
2175 let endpoints: Vec<_> = addrs
2176 .into_iter()
2177 .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2178 .collect();
2179
2180 let mut last_error = None;
2181
2182 for (endpoint, addr) in endpoints {
2183 match Self::connect_to_endpoint(endpoint).await {
2184 Ok(channel) => {
2185 tracing::info!("Connect to meta server {} successfully", addr);
2186 return Ok((channel, addr));
2187 }
2188 Err(e) => {
2189 tracing::warn!(
2190 error = %e.as_report(),
2191 "Failed to connect to meta server {}, trying again",
2192 addr,
2193 );
2194 last_error = Some(e);
2195 }
2196 }
2197 }
2198
2199 if let Some(last_error) = last_error {
2200 Err(anyhow::anyhow!(last_error)
2201 .context("failed to connect to all meta servers")
2202 .into())
2203 } else {
2204 bail!("no meta server address provided")
2205 }
2206 }
2207
2208 async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2209 let channel = endpoint
2210 .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2211 .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2212 .connect_timeout(Duration::from_secs(5))
2213 .monitored_connect("grpc-meta-client", Default::default())
2214 .await?
2215 .wrapped();
2216
2217 Ok(channel)
2218 }
2219
2220 pub(crate) fn retry_strategy_to_bound(
2221 high_bound: Duration,
2222 exceed: bool,
2223 ) -> impl Iterator<Item = Duration> {
2224 let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2225 .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2226 .map(jitter);
2227
2228 let mut sum = Duration::default();
2229
2230 iter.take_while(move |duration| {
2231 sum += *duration;
2232
2233 if exceed {
2234 sum < high_bound + *duration
2235 } else {
2236 sum < high_bound
2237 }
2238 })
2239 }
2240}
2241
2242macro_rules! for_all_meta_rpc {
2243 ($macro:ident) => {
2244 $macro! {
2245 { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2246 ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2247 ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2248 ,{ cluster_client, update_worker_node_schedulability, UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse }
2249 ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2250 ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2251 ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2252 ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2253 ,{ stream_client, flush, FlushRequest, FlushResponse }
2254 ,{ stream_client, pause, PauseRequest, PauseResponse }
2255 ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2256 ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2257 ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2258 ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2259 ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2260 ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2261 ,{ stream_client, list_creating_stream_scan_fragment_distribution, ListCreatingStreamScanFragmentDistributionRequest, ListCreatingStreamScanFragmentDistributionResponse }
2262 ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2263 ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2264 ,{ stream_client, list_object_dependencies, ListObjectDependenciesRequest, ListObjectDependenciesResponse }
2265 ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2266 ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2267 ,{ stream_client, alter_connector_props, AlterConnectorPropsRequest, AlterConnectorPropsResponse }
2268 ,{ stream_client, get_fragment_by_id, GetFragmentByIdRequest, GetFragmentByIdResponse }
2269 ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2270 ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2271 ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2272 ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2273 ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2274 ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2275 ,{ ddl_client, alter_database_param, AlterDatabaseParamRequest, AlterDatabaseParamResponse }
2276 ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2277 ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2278 ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2279 ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2280 ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2281 ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2282 ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2283 ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2284 ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2285 ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2286 ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2287 ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2288 ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2289 ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2290 ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2291 ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2292 ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2293 ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2294 ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2295 ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2296 ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2297 ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2298 ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2299 ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2300 ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2301 ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2302 ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2303 ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2304 ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2305 ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2306 ,{ ddl_client, wait, WaitRequest, WaitResponse }
2307 ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2308 ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2309 ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2310 ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2311 ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2312 ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2313 ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2314 ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2315 ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2316 ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2317 ,{ hummock_client, get_new_object_ids, GetNewObjectIdsRequest, GetNewObjectIdsResponse }
2318 ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2319 ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2320 ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2321 ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2322 ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2323 ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2324 ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2325 ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2326 ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2327 ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2328 ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2329 ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2330 ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2331 ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2332 ,{ hummock_client, subscribe_iceberg_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeIcebergCompactionEventRequest>, Streaming<SubscribeIcebergCompactionEventResponse> }
2333 ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2334 ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2335 ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2336 ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2337 ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2338 ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2339 ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2340 ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2341 ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2342 ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2343 ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2344 ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2345 ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2346 ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2347 ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2348 ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2349 ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2350 ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2351 ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2352 ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2353 ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2354 ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2355 ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2356 ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2357 ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2358 ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2359 ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2360 ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2361 ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2362 ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2363 ,{ hosted_iceberg_catalog_service_client, list_iceberg_tables, ListIcebergTablesRequest, ListIcebergTablesResponse }
2364 ,{ monitor_client, stack_trace, StackTraceRequest, StackTraceResponse }
2365 }
2366 };
2367}
2368
2369impl GrpcMetaClient {
2370 async fn refresh_client_if_needed(&self, code: Code) {
2371 if matches!(
2372 code,
2373 Code::Unknown | Code::Unimplemented | Code::Unavailable
2374 ) {
2375 tracing::debug!("matching tonic code {}", code);
2376 let (result_sender, result_receiver) = oneshot::channel();
2377 if self
2378 .member_monitor_event_sender
2379 .try_send(result_sender)
2380 .is_ok()
2381 {
2382 if let Ok(Err(e)) = result_receiver.await {
2383 tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2384 }
2385 } else {
2386 tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2387 }
2388 }
2389 }
2390}
2391
2392impl GrpcMetaClient {
2393 for_all_meta_rpc! { meta_rpc_client_method_impl }
2394}