1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{
33 AlterDatabaseParam, FunctionId, IndexId, ObjectId, SecretId, TableId,
34};
35use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
36use risingwave_common::hash::WorkerSlotMapping;
37use risingwave_common::monitor::EndpointExt;
38use risingwave_common::system_param::reader::SystemParamsReader;
39use risingwave_common::telemetry::report::TelemetryInfoFetcher;
40use risingwave_common::util::addr::HostAddr;
41use risingwave_common::util::meta_addr::MetaAddressStrategy;
42use risingwave_common::util::resource_util::cpu::total_cpu_available;
43use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
44use risingwave_error::bail;
45use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
46use risingwave_hummock_sdk::compaction_group::StateTableId;
47use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
48use risingwave_hummock_sdk::{
49 CompactionGroupId, HummockEpoch, HummockVersionId, ObjectIdRange, SyncResult,
50};
51use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
52use risingwave_pb::backup_service::*;
53use risingwave_pb::catalog::{
54 Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
55 PbSubscription, PbTable, PbView, Table,
56};
57use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
58use risingwave_pb::cloud_service::*;
59use risingwave_pb::common::worker_node::Property;
60use risingwave_pb::common::{HostAddress, OptionalUint32, OptionalUint64, WorkerNode, WorkerType};
61use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
62use risingwave_pb::ddl_service::alter_owner_request::Object;
63use risingwave_pb::ddl_service::create_materialized_view_request::PbBackfillType;
64use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
65use risingwave_pb::ddl_service::drop_table_request::SourceId;
66use risingwave_pb::ddl_service::*;
67use risingwave_pb::hummock::compact_task::TaskStatus;
68use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
69use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
70use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
71use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
72use risingwave_pb::hummock::write_limits::WriteLimit;
73use risingwave_pb::hummock::*;
74use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Register as IcebergRegister;
75use risingwave_pb::iceberg_compaction::{
76 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
77 subscribe_iceberg_compaction_event_request,
78};
79use risingwave_pb::meta::alter_connector_props_request::AlterConnectorPropsObject;
80use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
81use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
82use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
83use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
84use risingwave_pb::meta::hosted_iceberg_catalog_service_client::HostedIcebergCatalogServiceClient;
85use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
86use risingwave_pb::meta::list_actor_states_response::ActorState;
87use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
88use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
89use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
90use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
91use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
92use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
93use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
94use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
95use risingwave_pb::meta::serving_service_client::ServingServiceClient;
96use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
97use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
98use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
99use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
100use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
101use risingwave_pb::meta::{FragmentDistribution, *};
102use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
103use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
104use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
105use risingwave_pb::secret::PbSecretRef;
106use risingwave_pb::stream_plan::StreamFragmentGraph;
107use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
108use risingwave_pb::user::update_user_request::UpdateField;
109use risingwave_pb::user::user_service_client::UserServiceClient;
110use risingwave_pb::user::*;
111use thiserror_ext::AsReport;
112use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
113use tokio::sync::oneshot::Sender;
114use tokio::sync::{RwLock, mpsc, oneshot};
115use tokio::task::JoinHandle;
116use tokio::time::{self};
117use tokio_retry::strategy::{ExponentialBackoff, jitter};
118use tokio_stream::wrappers::UnboundedReceiverStream;
119use tonic::transport::Endpoint;
120use tonic::{Code, Request, Streaming};
121
122use crate::channel::{Channel, WrappedChannelExt};
123use crate::error::{Result, RpcError};
124use crate::hummock_meta_client::{
125 CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
126 IcebergCompactionEventItem,
127};
128use crate::meta_rpc_client_method_impl;
129
130type ConnectionId = u32;
131type DatabaseId = u32;
132type SchemaId = u32;
133
134#[derive(Clone, Debug)]
136pub struct MetaClient {
137 worker_id: u32,
138 worker_type: WorkerType,
139 host_addr: HostAddr,
140 inner: GrpcMetaClient,
141 meta_config: MetaConfig,
142 cluster_id: String,
143 shutting_down: Arc<AtomicBool>,
144}
145
146impl MetaClient {
147 pub fn worker_id(&self) -> u32 {
148 self.worker_id
149 }
150
151 pub fn host_addr(&self) -> &HostAddr {
152 &self.host_addr
153 }
154
155 pub fn worker_type(&self) -> WorkerType {
156 self.worker_type
157 }
158
159 pub fn cluster_id(&self) -> &str {
160 &self.cluster_id
161 }
162
163 pub async fn subscribe(
165 &self,
166 subscribe_type: SubscribeType,
167 ) -> Result<Streaming<SubscribeResponse>> {
168 let request = SubscribeRequest {
169 subscribe_type: subscribe_type as i32,
170 host: Some(self.host_addr.to_protobuf()),
171 worker_id: self.worker_id(),
172 };
173
174 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
175 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
176 true,
177 );
178
179 tokio_retry::Retry::spawn(retry_strategy, || async {
180 let request = request.clone();
181 self.inner.subscribe(request).await
182 })
183 .await
184 }
185
186 pub async fn create_connection(
187 &self,
188 connection_name: String,
189 database_id: u32,
190 schema_id: u32,
191 owner_id: u32,
192 req: create_connection_request::Payload,
193 ) -> Result<WaitVersion> {
194 let request = CreateConnectionRequest {
195 name: connection_name,
196 database_id,
197 schema_id,
198 owner_id,
199 payload: Some(req),
200 };
201 let resp = self.inner.create_connection(request).await?;
202 Ok(resp
203 .version
204 .ok_or_else(|| anyhow!("wait version not set"))?)
205 }
206
207 pub async fn create_secret(
208 &self,
209 secret_name: String,
210 database_id: u32,
211 schema_id: u32,
212 owner_id: u32,
213 value: Vec<u8>,
214 ) -> Result<WaitVersion> {
215 let request = CreateSecretRequest {
216 name: secret_name,
217 database_id,
218 schema_id,
219 owner_id,
220 value,
221 };
222 let resp = self.inner.create_secret(request).await?;
223 Ok(resp
224 .version
225 .ok_or_else(|| anyhow!("wait version not set"))?)
226 }
227
228 pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
229 let request = ListConnectionsRequest {};
230 let resp = self.inner.list_connections(request).await?;
231 Ok(resp.connections)
232 }
233
234 pub async fn drop_connection(
235 &self,
236 connection_id: ConnectionId,
237 cascade: bool,
238 ) -> Result<WaitVersion> {
239 let request = DropConnectionRequest {
240 connection_id,
241 cascade,
242 };
243 let resp = self.inner.drop_connection(request).await?;
244 Ok(resp
245 .version
246 .ok_or_else(|| anyhow!("wait version not set"))?)
247 }
248
249 pub async fn drop_secret(&self, secret_id: SecretId) -> Result<WaitVersion> {
250 let request = DropSecretRequest {
251 secret_id: secret_id.into(),
252 };
253 let resp = self.inner.drop_secret(request).await?;
254 Ok(resp
255 .version
256 .ok_or_else(|| anyhow!("wait version not set"))?)
257 }
258
259 pub async fn register_new(
263 addr_strategy: MetaAddressStrategy,
264 worker_type: WorkerType,
265 addr: &HostAddr,
266 property: Property,
267 meta_config: &MetaConfig,
268 ) -> (Self, SystemParamsReader) {
269 let ret =
270 Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
271
272 match ret {
273 Ok(ret) => ret,
274 Err(err) => {
275 tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
276 std::process::exit(1);
277 }
278 }
279 }
280
281 async fn register_new_inner(
282 addr_strategy: MetaAddressStrategy,
283 worker_type: WorkerType,
284 addr: &HostAddr,
285 property: Property,
286 meta_config: &MetaConfig,
287 ) -> Result<(Self, SystemParamsReader)> {
288 tracing::info!("register meta client using strategy: {}", addr_strategy);
289
290 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
292 Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
293 true,
294 );
295
296 if property.is_unschedulable {
297 tracing::warn!("worker {:?} registered as unschedulable", addr.clone());
298 }
299 let init_result: Result<_> = tokio_retry::RetryIf::spawn(
300 retry_strategy,
301 || async {
302 let grpc_meta_client =
303 GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
304
305 let add_worker_resp = grpc_meta_client
306 .add_worker_node(AddWorkerNodeRequest {
307 worker_type: worker_type as i32,
308 host: Some(addr.to_protobuf()),
309 property: Some(property.clone()),
310 resource: Some(risingwave_pb::common::worker_node::Resource {
311 rw_version: RW_VERSION.to_owned(),
312 total_memory_bytes: system_memory_available_bytes() as _,
313 total_cpu_cores: total_cpu_available() as _,
314 }),
315 })
316 .await
317 .context("failed to add worker node")?;
318
319 let system_params_resp = grpc_meta_client
320 .get_system_params(GetSystemParamsRequest {})
321 .await
322 .context("failed to get initial system params")?;
323
324 Ok((add_worker_resp, system_params_resp, grpc_meta_client))
325 },
326 |e: &RpcError| !e.is_from_tonic_server_impl(),
329 )
330 .await;
331
332 let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
333 let worker_id = add_worker_resp
334 .node_id
335 .expect("AddWorkerNodeResponse::node_id is empty");
336
337 let meta_client = Self {
338 worker_id,
339 worker_type,
340 host_addr: addr.clone(),
341 inner: grpc_meta_client,
342 meta_config: meta_config.to_owned(),
343 cluster_id: add_worker_resp.cluster_id,
344 shutting_down: Arc::new(false.into()),
345 };
346
347 static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
348 REPORT_PANIC.call_once(|| {
349 let meta_client_clone = meta_client.clone();
350 std::panic::update_hook(move |default_hook, info| {
351 meta_client_clone.try_add_panic_event_blocking(info, None);
353 default_hook(info);
354 });
355 });
356
357 Ok((meta_client, system_params_resp.params.unwrap().into()))
358 }
359
360 pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
362 let request = ActivateWorkerNodeRequest {
363 host: Some(addr.to_protobuf()),
364 node_id: self.worker_id,
365 };
366 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
367 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
368 true,
369 );
370 tokio_retry::Retry::spawn(retry_strategy, || async {
371 let request = request.clone();
372 self.inner.activate_worker_node(request).await
373 })
374 .await?;
375
376 Ok(())
377 }
378
379 pub async fn send_heartbeat(&self, node_id: u32) -> Result<()> {
381 let request = HeartbeatRequest { node_id };
382 let resp = self.inner.heartbeat(request).await?;
383 if let Some(status) = resp.status
384 && status.code() == risingwave_pb::common::status::Code::UnknownWorker
385 {
386 if !self.shutting_down.load(Relaxed) {
389 tracing::error!(message = status.message, "worker expired");
390 std::process::exit(1);
391 }
392 }
393 Ok(())
394 }
395
396 pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
397 let request = CreateDatabaseRequest { db: Some(db) };
398 let resp = self.inner.create_database(request).await?;
399 Ok(resp
401 .version
402 .ok_or_else(|| anyhow!("wait version not set"))?)
403 }
404
405 pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
406 let request = CreateSchemaRequest {
407 schema: Some(schema),
408 };
409 let resp = self.inner.create_schema(request).await?;
410 Ok(resp
412 .version
413 .ok_or_else(|| anyhow!("wait version not set"))?)
414 }
415
416 pub async fn create_materialized_view(
417 &self,
418 table: PbTable,
419 graph: StreamFragmentGraph,
420 dependencies: HashSet<ObjectId>,
421 specific_resource_group: Option<String>,
422 if_not_exists: bool,
423 ) -> Result<WaitVersion> {
424 let request = CreateMaterializedViewRequest {
425 materialized_view: Some(table),
426 fragment_graph: Some(graph),
427 backfill: PbBackfillType::Regular as _,
428 dependencies: dependencies.into_iter().collect(),
429 specific_resource_group,
430 if_not_exists,
431 };
432 let resp = self.inner.create_materialized_view(request).await?;
433 Ok(resp
435 .version
436 .ok_or_else(|| anyhow!("wait version not set"))?)
437 }
438
439 pub async fn drop_materialized_view(
440 &self,
441 table_id: TableId,
442 cascade: bool,
443 ) -> Result<WaitVersion> {
444 let request = DropMaterializedViewRequest {
445 table_id: table_id.table_id(),
446 cascade,
447 };
448
449 let resp = self.inner.drop_materialized_view(request).await?;
450 Ok(resp
451 .version
452 .ok_or_else(|| anyhow!("wait version not set"))?)
453 }
454
455 pub async fn create_source(
456 &self,
457 source: PbSource,
458 graph: Option<StreamFragmentGraph>,
459 if_not_exists: bool,
460 ) -> Result<WaitVersion> {
461 let request = CreateSourceRequest {
462 source: Some(source),
463 fragment_graph: graph,
464 if_not_exists,
465 };
466
467 let resp = self.inner.create_source(request).await?;
468 Ok(resp
469 .version
470 .ok_or_else(|| anyhow!("wait version not set"))?)
471 }
472
473 pub async fn create_sink(
474 &self,
475 sink: PbSink,
476 graph: StreamFragmentGraph,
477 dependencies: HashSet<ObjectId>,
478 if_not_exists: bool,
479 ) -> Result<WaitVersion> {
480 let request = CreateSinkRequest {
481 sink: Some(sink),
482 fragment_graph: Some(graph),
483 dependencies: dependencies.into_iter().collect(),
484 if_not_exists,
485 };
486
487 let resp = self.inner.create_sink(request).await?;
488 Ok(resp
489 .version
490 .ok_or_else(|| anyhow!("wait version not set"))?)
491 }
492
493 pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
494 let request = CreateSubscriptionRequest {
495 subscription: Some(subscription),
496 };
497
498 let resp = self.inner.create_subscription(request).await?;
499 Ok(resp
500 .version
501 .ok_or_else(|| anyhow!("wait version not set"))?)
502 }
503
504 pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
505 let request = CreateFunctionRequest {
506 function: Some(function),
507 };
508 let resp = self.inner.create_function(request).await?;
509 Ok(resp
510 .version
511 .ok_or_else(|| anyhow!("wait version not set"))?)
512 }
513
514 pub async fn create_table(
515 &self,
516 source: Option<PbSource>,
517 table: PbTable,
518 graph: StreamFragmentGraph,
519 job_type: PbTableJobType,
520 if_not_exists: bool,
521 dependencies: HashSet<ObjectId>,
522 ) -> Result<WaitVersion> {
523 let request = CreateTableRequest {
524 materialized_view: Some(table),
525 fragment_graph: Some(graph),
526 source,
527 job_type: job_type as _,
528 if_not_exists,
529 dependencies: dependencies.into_iter().collect(),
530 };
531 let resp = self.inner.create_table(request).await?;
532 Ok(resp
534 .version
535 .ok_or_else(|| anyhow!("wait version not set"))?)
536 }
537
538 pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
539 let request = CommentOnRequest {
540 comment: Some(comment),
541 };
542 let resp = self.inner.comment_on(request).await?;
543 Ok(resp
544 .version
545 .ok_or_else(|| anyhow!("wait version not set"))?)
546 }
547
548 pub async fn alter_name(
549 &self,
550 object: alter_name_request::Object,
551 name: &str,
552 ) -> Result<WaitVersion> {
553 let request = AlterNameRequest {
554 object: Some(object),
555 new_name: name.to_owned(),
556 };
557 let resp = self.inner.alter_name(request).await?;
558 Ok(resp
559 .version
560 .ok_or_else(|| anyhow!("wait version not set"))?)
561 }
562
563 pub async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<WaitVersion> {
564 let request = AlterOwnerRequest {
565 object: Some(object),
566 owner_id,
567 };
568 let resp = self.inner.alter_owner(request).await?;
569 Ok(resp
570 .version
571 .ok_or_else(|| anyhow!("wait version not set"))?)
572 }
573
574 pub async fn alter_database_param(
575 &self,
576 database_id: DatabaseId,
577 param: AlterDatabaseParam,
578 ) -> Result<WaitVersion> {
579 let request = match param {
580 AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms) => {
581 let barrier_interval_ms = OptionalUint32 {
582 value: barrier_interval_ms,
583 };
584 AlterDatabaseParamRequest {
585 database_id,
586 param: Some(alter_database_param_request::Param::BarrierIntervalMs(
587 barrier_interval_ms,
588 )),
589 }
590 }
591 AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency) => {
592 let checkpoint_frequency = OptionalUint64 {
593 value: checkpoint_frequency,
594 };
595 AlterDatabaseParamRequest {
596 database_id,
597 param: Some(alter_database_param_request::Param::CheckpointFrequency(
598 checkpoint_frequency,
599 )),
600 }
601 }
602 };
603 let resp = self.inner.alter_database_param(request).await?;
604 Ok(resp
605 .version
606 .ok_or_else(|| anyhow!("wait version not set"))?)
607 }
608
609 pub async fn alter_set_schema(
610 &self,
611 object: alter_set_schema_request::Object,
612 new_schema_id: u32,
613 ) -> Result<WaitVersion> {
614 let request = AlterSetSchemaRequest {
615 new_schema_id,
616 object: Some(object),
617 };
618 let resp = self.inner.alter_set_schema(request).await?;
619 Ok(resp
620 .version
621 .ok_or_else(|| anyhow!("wait version not set"))?)
622 }
623
624 pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
625 let request = AlterSourceRequest {
626 source: Some(source),
627 };
628 let resp = self.inner.alter_source(request).await?;
629 Ok(resp
630 .version
631 .ok_or_else(|| anyhow!("wait version not set"))?)
632 }
633
634 pub async fn alter_parallelism(
635 &self,
636 table_id: u32,
637 parallelism: PbTableParallelism,
638 deferred: bool,
639 ) -> Result<()> {
640 let request = AlterParallelismRequest {
641 table_id,
642 parallelism: Some(parallelism),
643 deferred,
644 };
645
646 self.inner.alter_parallelism(request).await?;
647 Ok(())
648 }
649
650 pub async fn alter_cdc_table_backfill_parallelism(
651 &self,
652 table_id: u32,
653 parallelism: PbTableParallelism,
654 ) -> Result<()> {
655 let request = AlterCdcTableBackfillParallelismRequest {
656 table_id,
657 parallelism: Some(parallelism),
658 };
659 self.inner
660 .alter_cdc_table_backfill_parallelism(request)
661 .await?;
662 Ok(())
663 }
664
665 pub async fn alter_resource_group(
666 &self,
667 table_id: u32,
668 resource_group: Option<String>,
669 deferred: bool,
670 ) -> Result<()> {
671 let request = AlterResourceGroupRequest {
672 table_id,
673 resource_group,
674 deferred,
675 };
676
677 self.inner.alter_resource_group(request).await?;
678 Ok(())
679 }
680
681 pub async fn alter_swap_rename(
682 &self,
683 object: alter_swap_rename_request::Object,
684 ) -> Result<WaitVersion> {
685 let request = AlterSwapRenameRequest {
686 object: Some(object),
687 };
688 let resp = self.inner.alter_swap_rename(request).await?;
689 Ok(resp
690 .version
691 .ok_or_else(|| anyhow!("wait version not set"))?)
692 }
693
694 pub async fn alter_secret(
695 &self,
696 secret_id: u32,
697 secret_name: String,
698 database_id: u32,
699 schema_id: u32,
700 owner_id: u32,
701 value: Vec<u8>,
702 ) -> Result<WaitVersion> {
703 let request = AlterSecretRequest {
704 secret_id,
705 name: secret_name,
706 database_id,
707 schema_id,
708 owner_id,
709 value,
710 };
711 let resp = self.inner.alter_secret(request).await?;
712 Ok(resp
713 .version
714 .ok_or_else(|| anyhow!("wait version not set"))?)
715 }
716
717 pub async fn replace_job(
718 &self,
719 graph: StreamFragmentGraph,
720 replace_job: ReplaceJob,
721 ) -> Result<WaitVersion> {
722 let request = ReplaceJobPlanRequest {
723 plan: Some(ReplaceJobPlan {
724 fragment_graph: Some(graph),
725 replace_job: Some(replace_job),
726 }),
727 };
728 let resp = self.inner.replace_job_plan(request).await?;
729 Ok(resp
731 .version
732 .ok_or_else(|| anyhow!("wait version not set"))?)
733 }
734
735 pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
736 let request = AutoSchemaChangeRequest {
737 schema_change: Some(schema_change),
738 };
739 let _ = self.inner.auto_schema_change(request).await?;
740 Ok(())
741 }
742
743 pub async fn create_view(
744 &self,
745 view: PbView,
746 dependencies: HashSet<ObjectId>,
747 ) -> Result<WaitVersion> {
748 let request = CreateViewRequest {
749 view: Some(view),
750 dependencies: dependencies.into_iter().collect(),
751 };
752 let resp = self.inner.create_view(request).await?;
753 Ok(resp
755 .version
756 .ok_or_else(|| anyhow!("wait version not set"))?)
757 }
758
759 pub async fn create_index(
760 &self,
761 index: PbIndex,
762 table: PbTable,
763 graph: StreamFragmentGraph,
764 if_not_exists: bool,
765 ) -> Result<WaitVersion> {
766 let request = CreateIndexRequest {
767 index: Some(index),
768 index_table: Some(table),
769 fragment_graph: Some(graph),
770 if_not_exists,
771 };
772 let resp = self.inner.create_index(request).await?;
773 Ok(resp
775 .version
776 .ok_or_else(|| anyhow!("wait version not set"))?)
777 }
778
779 pub async fn drop_table(
780 &self,
781 source_id: Option<u32>,
782 table_id: TableId,
783 cascade: bool,
784 ) -> Result<WaitVersion> {
785 let request = DropTableRequest {
786 source_id: source_id.map(SourceId::Id),
787 table_id: table_id.table_id(),
788 cascade,
789 };
790
791 let resp = self.inner.drop_table(request).await?;
792 Ok(resp
793 .version
794 .ok_or_else(|| anyhow!("wait version not set"))?)
795 }
796
797 pub async fn compact_iceberg_table(&self, sink_id: u32) -> Result<u64> {
798 let request = CompactIcebergTableRequest { sink_id };
799 let resp = self.inner.compact_iceberg_table(request).await?;
800 Ok(resp.task_id)
801 }
802
803 pub async fn expire_iceberg_table_snapshots(&self, sink_id: u32) -> Result<()> {
804 let request = ExpireIcebergTableSnapshotsRequest { sink_id };
805 let _resp = self.inner.expire_iceberg_table_snapshots(request).await?;
806 Ok(())
807 }
808
809 pub async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<WaitVersion> {
810 let request = DropViewRequest { view_id, cascade };
811 let resp = self.inner.drop_view(request).await?;
812 Ok(resp
813 .version
814 .ok_or_else(|| anyhow!("wait version not set"))?)
815 }
816
817 pub async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<WaitVersion> {
818 let request = DropSourceRequest { source_id, cascade };
819 let resp = self.inner.drop_source(request).await?;
820 Ok(resp
821 .version
822 .ok_or_else(|| anyhow!("wait version not set"))?)
823 }
824
825 pub async fn drop_sink(&self, sink_id: u32, cascade: bool) -> Result<WaitVersion> {
826 let request = DropSinkRequest { sink_id, cascade };
827 let resp = self.inner.drop_sink(request).await?;
828 Ok(resp
829 .version
830 .ok_or_else(|| anyhow!("wait version not set"))?)
831 }
832
833 pub async fn drop_subscription(
834 &self,
835 subscription_id: u32,
836 cascade: bool,
837 ) -> Result<WaitVersion> {
838 let request = DropSubscriptionRequest {
839 subscription_id,
840 cascade,
841 };
842 let resp = self.inner.drop_subscription(request).await?;
843 Ok(resp
844 .version
845 .ok_or_else(|| anyhow!("wait version not set"))?)
846 }
847
848 pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
849 let request = DropIndexRequest {
850 index_id: index_id.index_id,
851 cascade,
852 };
853 let resp = self.inner.drop_index(request).await?;
854 Ok(resp
855 .version
856 .ok_or_else(|| anyhow!("wait version not set"))?)
857 }
858
859 pub async fn drop_function(
860 &self,
861 function_id: FunctionId,
862 cascade: bool,
863 ) -> Result<WaitVersion> {
864 let request = DropFunctionRequest {
865 function_id: function_id.0,
866 cascade,
867 };
868 let resp = self.inner.drop_function(request).await?;
869 Ok(resp
870 .version
871 .ok_or_else(|| anyhow!("wait version not set"))?)
872 }
873
874 pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
875 let request = DropDatabaseRequest { database_id };
876 let resp = self.inner.drop_database(request).await?;
877 Ok(resp
878 .version
879 .ok_or_else(|| anyhow!("wait version not set"))?)
880 }
881
882 pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
883 let request = DropSchemaRequest { schema_id, cascade };
884 let resp = self.inner.drop_schema(request).await?;
885 Ok(resp
886 .version
887 .ok_or_else(|| anyhow!("wait version not set"))?)
888 }
889
890 pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
892 let request = CreateUserRequest { user: Some(user) };
893 let resp = self.inner.create_user(request).await?;
894 Ok(resp.version)
895 }
896
897 pub async fn drop_user(&self, user_id: u32) -> Result<u64> {
898 let request = DropUserRequest { user_id };
899 let resp = self.inner.drop_user(request).await?;
900 Ok(resp.version)
901 }
902
903 pub async fn update_user(
904 &self,
905 user: UserInfo,
906 update_fields: Vec<UpdateField>,
907 ) -> Result<u64> {
908 let request = UpdateUserRequest {
909 user: Some(user),
910 update_fields: update_fields
911 .into_iter()
912 .map(|field| field as i32)
913 .collect::<Vec<_>>(),
914 };
915 let resp = self.inner.update_user(request).await?;
916 Ok(resp.version)
917 }
918
919 pub async fn grant_privilege(
920 &self,
921 user_ids: Vec<u32>,
922 privileges: Vec<GrantPrivilege>,
923 with_grant_option: bool,
924 granted_by: u32,
925 ) -> Result<u64> {
926 let request = GrantPrivilegeRequest {
927 user_ids,
928 privileges,
929 with_grant_option,
930 granted_by,
931 };
932 let resp = self.inner.grant_privilege(request).await?;
933 Ok(resp.version)
934 }
935
936 pub async fn revoke_privilege(
937 &self,
938 user_ids: Vec<u32>,
939 privileges: Vec<GrantPrivilege>,
940 granted_by: u32,
941 revoke_by: u32,
942 revoke_grant_option: bool,
943 cascade: bool,
944 ) -> Result<u64> {
945 let request = RevokePrivilegeRequest {
946 user_ids,
947 privileges,
948 granted_by,
949 revoke_by,
950 revoke_grant_option,
951 cascade,
952 };
953 let resp = self.inner.revoke_privilege(request).await?;
954 Ok(resp.version)
955 }
956
957 pub async fn alter_default_privilege(
958 &self,
959 user_ids: Vec<u32>,
960 database_id: DatabaseId,
961 schema_ids: Vec<SchemaId>,
962 operation: AlterDefaultPrivilegeOperation,
963 granted_by: u32,
964 ) -> Result<()> {
965 let request = AlterDefaultPrivilegeRequest {
966 user_ids,
967 database_id,
968 schema_ids,
969 operation: Some(operation),
970 granted_by,
971 };
972 self.inner.alter_default_privilege(request).await?;
973 Ok(())
974 }
975
976 pub async fn unregister(&self) -> Result<()> {
978 let request = DeleteWorkerNodeRequest {
979 host: Some(self.host_addr.to_protobuf()),
980 };
981 self.inner.delete_worker_node(request).await?;
982 self.shutting_down.store(true, Relaxed);
983 Ok(())
984 }
985
986 pub async fn try_unregister(&self) {
988 match self.unregister().await {
989 Ok(_) => {
990 tracing::info!(
991 worker_id = self.worker_id(),
992 "successfully unregistered from meta service",
993 )
994 }
995 Err(e) => {
996 tracing::warn!(
997 error = %e.as_report(),
998 worker_id = self.worker_id(),
999 "failed to unregister from meta service",
1000 );
1001 }
1002 }
1003 }
1004
1005 pub async fn update_schedulability(
1006 &self,
1007 worker_ids: &[u32],
1008 schedulability: Schedulability,
1009 ) -> Result<UpdateWorkerNodeSchedulabilityResponse> {
1010 let request = UpdateWorkerNodeSchedulabilityRequest {
1011 worker_ids: worker_ids.to_vec(),
1012 schedulability: schedulability.into(),
1013 };
1014 let resp = self
1015 .inner
1016 .update_worker_node_schedulability(request)
1017 .await?;
1018 Ok(resp)
1019 }
1020
1021 pub async fn list_worker_nodes(
1022 &self,
1023 worker_type: Option<WorkerType>,
1024 ) -> Result<Vec<WorkerNode>> {
1025 let request = ListAllNodesRequest {
1026 worker_type: worker_type.map(Into::into),
1027 include_starting_nodes: true,
1028 };
1029 let resp = self.inner.list_all_nodes(request).await?;
1030 Ok(resp.nodes)
1031 }
1032
1033 pub fn start_heartbeat_loop(
1035 meta_client: MetaClient,
1036 min_interval: Duration,
1037 ) -> (JoinHandle<()>, Sender<()>) {
1038 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1039 let join_handle = tokio::spawn(async move {
1040 let mut min_interval_ticker = tokio::time::interval(min_interval);
1041 loop {
1042 tokio::select! {
1043 biased;
1044 _ = &mut shutdown_rx => {
1046 tracing::info!("Heartbeat loop is stopped");
1047 return;
1048 }
1049 _ = min_interval_ticker.tick() => {},
1051 }
1052 tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
1053 match tokio::time::timeout(
1054 min_interval * 3,
1056 meta_client.send_heartbeat(meta_client.worker_id()),
1057 )
1058 .await
1059 {
1060 Ok(Ok(_)) => {}
1061 Ok(Err(err)) => {
1062 tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
1063 }
1064 Err(_) => {
1065 tracing::warn!("Failed to send_heartbeat: timeout");
1066 }
1067 }
1068 }
1069 });
1070 (join_handle, shutdown_tx)
1071 }
1072
1073 pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
1074 let request = RisectlListStateTablesRequest {};
1075 let resp = self.inner.risectl_list_state_tables(request).await?;
1076 Ok(resp.tables)
1077 }
1078
1079 pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
1080 let request = FlushRequest { database_id };
1081 let resp = self.inner.flush(request).await?;
1082 Ok(HummockVersionId::new(resp.hummock_version_id))
1083 }
1084
1085 pub async fn wait(&self) -> Result<()> {
1086 let request = WaitRequest {};
1087 self.inner.wait(request).await?;
1088 Ok(())
1089 }
1090
1091 pub async fn recover(&self) -> Result<()> {
1092 let request = RecoverRequest {};
1093 self.inner.recover(request).await?;
1094 Ok(())
1095 }
1096
1097 pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
1098 let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
1099 let resp = self.inner.cancel_creating_jobs(request).await?;
1100 Ok(resp.canceled_jobs)
1101 }
1102
1103 pub async fn list_table_fragments(
1104 &self,
1105 table_ids: &[u32],
1106 ) -> Result<HashMap<u32, TableFragmentInfo>> {
1107 let request = ListTableFragmentsRequest {
1108 table_ids: table_ids.to_vec(),
1109 };
1110 let resp = self.inner.list_table_fragments(request).await?;
1111 Ok(resp.table_fragments)
1112 }
1113
1114 pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1115 let resp = self
1116 .inner
1117 .list_streaming_job_states(ListStreamingJobStatesRequest {})
1118 .await?;
1119 Ok(resp.states)
1120 }
1121
1122 pub async fn list_fragment_distributions(&self) -> Result<Vec<FragmentDistribution>> {
1123 let resp = self
1124 .inner
1125 .list_fragment_distribution(ListFragmentDistributionRequest {})
1126 .await?;
1127 Ok(resp.distributions)
1128 }
1129
1130 pub async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
1131 let resp = self
1132 .inner
1133 .list_creating_fragment_distribution(ListCreatingFragmentDistributionRequest {})
1134 .await?;
1135 Ok(resp.distributions)
1136 }
1137
1138 pub async fn get_fragment_by_id(
1139 &self,
1140 fragment_id: u32,
1141 ) -> Result<Option<FragmentDistribution>> {
1142 let resp = self
1143 .inner
1144 .get_fragment_by_id(GetFragmentByIdRequest { fragment_id })
1145 .await?;
1146 Ok(resp.distribution)
1147 }
1148
1149 pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1150 let resp = self
1151 .inner
1152 .list_actor_states(ListActorStatesRequest {})
1153 .await?;
1154 Ok(resp.states)
1155 }
1156
1157 pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1158 let resp = self
1159 .inner
1160 .list_actor_splits(ListActorSplitsRequest {})
1161 .await?;
1162
1163 Ok(resp.actor_splits)
1164 }
1165
1166 pub async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
1167 let resp = self
1168 .inner
1169 .list_object_dependencies(ListObjectDependenciesRequest {})
1170 .await?;
1171 Ok(resp.dependencies)
1172 }
1173
1174 pub async fn pause(&self) -> Result<PauseResponse> {
1175 let request = PauseRequest {};
1176 let resp = self.inner.pause(request).await?;
1177 Ok(resp)
1178 }
1179
1180 pub async fn resume(&self) -> Result<ResumeResponse> {
1181 let request = ResumeRequest {};
1182 let resp = self.inner.resume(request).await?;
1183 Ok(resp)
1184 }
1185
1186 pub async fn apply_throttle(
1187 &self,
1188 kind: PbThrottleTarget,
1189 id: u32,
1190 rate: Option<u32>,
1191 ) -> Result<ApplyThrottleResponse> {
1192 let request = ApplyThrottleRequest {
1193 kind: kind as i32,
1194 id,
1195 rate,
1196 };
1197 let resp = self.inner.apply_throttle(request).await?;
1198 Ok(resp)
1199 }
1200
1201 pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1202 let resp = self
1203 .inner
1204 .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1205 .await?;
1206 Ok(resp.get_status().unwrap())
1207 }
1208
1209 pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1210 let request = GetClusterInfoRequest {};
1211 let resp = self.inner.get_cluster_info(request).await?;
1212 Ok(resp)
1213 }
1214
1215 pub async fn reschedule(
1216 &self,
1217 worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1218 revision: u64,
1219 resolve_no_shuffle_upstream: bool,
1220 ) -> Result<(bool, u64)> {
1221 let request = RescheduleRequest {
1222 revision,
1223 resolve_no_shuffle_upstream,
1224 worker_reschedules,
1225 };
1226 let resp = self.inner.reschedule(request).await?;
1227 Ok((resp.success, resp.revision))
1228 }
1229
1230 pub async fn risectl_get_pinned_versions_summary(
1231 &self,
1232 ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1233 let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1234 self.inner
1235 .rise_ctl_get_pinned_versions_summary(request)
1236 .await
1237 }
1238
1239 pub async fn risectl_get_checkpoint_hummock_version(
1240 &self,
1241 ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1242 let request = RiseCtlGetCheckpointVersionRequest {};
1243 self.inner.rise_ctl_get_checkpoint_version(request).await
1244 }
1245
1246 pub async fn risectl_pause_hummock_version_checkpoint(
1247 &self,
1248 ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1249 let request = RiseCtlPauseVersionCheckpointRequest {};
1250 self.inner.rise_ctl_pause_version_checkpoint(request).await
1251 }
1252
1253 pub async fn risectl_resume_hummock_version_checkpoint(
1254 &self,
1255 ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1256 let request = RiseCtlResumeVersionCheckpointRequest {};
1257 self.inner.rise_ctl_resume_version_checkpoint(request).await
1258 }
1259
1260 pub async fn init_metadata_for_replay(
1261 &self,
1262 tables: Vec<PbTable>,
1263 compaction_groups: Vec<CompactionGroupInfo>,
1264 ) -> Result<()> {
1265 let req = InitMetadataForReplayRequest {
1266 tables,
1267 compaction_groups,
1268 };
1269 let _resp = self.inner.init_metadata_for_replay(req).await?;
1270 Ok(())
1271 }
1272
1273 pub async fn replay_version_delta(
1274 &self,
1275 version_delta: HummockVersionDelta,
1276 ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1277 let req = ReplayVersionDeltaRequest {
1278 version_delta: Some(version_delta.into()),
1279 };
1280 let resp = self.inner.replay_version_delta(req).await?;
1281 Ok((
1282 HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1283 resp.modified_compaction_groups,
1284 ))
1285 }
1286
1287 pub async fn list_version_deltas(
1288 &self,
1289 start_id: HummockVersionId,
1290 num_limit: u32,
1291 committed_epoch_limit: HummockEpoch,
1292 ) -> Result<Vec<HummockVersionDelta>> {
1293 let req = ListVersionDeltasRequest {
1294 start_id: start_id.to_u64(),
1295 num_limit,
1296 committed_epoch_limit,
1297 };
1298 Ok(self
1299 .inner
1300 .list_version_deltas(req)
1301 .await?
1302 .version_deltas
1303 .unwrap()
1304 .version_deltas
1305 .iter()
1306 .map(HummockVersionDelta::from_rpc_protobuf)
1307 .collect())
1308 }
1309
1310 pub async fn trigger_compaction_deterministic(
1311 &self,
1312 version_id: HummockVersionId,
1313 compaction_groups: Vec<CompactionGroupId>,
1314 ) -> Result<()> {
1315 let req = TriggerCompactionDeterministicRequest {
1316 version_id: version_id.to_u64(),
1317 compaction_groups,
1318 };
1319 self.inner.trigger_compaction_deterministic(req).await?;
1320 Ok(())
1321 }
1322
1323 pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1324 let req = DisableCommitEpochRequest {};
1325 Ok(HummockVersion::from_rpc_protobuf(
1326 &self
1327 .inner
1328 .disable_commit_epoch(req)
1329 .await?
1330 .current_version
1331 .unwrap(),
1332 ))
1333 }
1334
1335 pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1336 let req = GetAssignedCompactTaskNumRequest {};
1337 let resp = self.inner.get_assigned_compact_task_num(req).await?;
1338 Ok(resp.num_tasks as usize)
1339 }
1340
1341 pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1342 let req = RiseCtlListCompactionGroupRequest {};
1343 let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1344 Ok(resp.compaction_groups)
1345 }
1346
1347 pub async fn risectl_update_compaction_config(
1348 &self,
1349 compaction_groups: &[CompactionGroupId],
1350 configs: &[MutableConfig],
1351 ) -> Result<()> {
1352 let req = RiseCtlUpdateCompactionConfigRequest {
1353 compaction_group_ids: compaction_groups.to_vec(),
1354 configs: configs
1355 .iter()
1356 .map(
1357 |c| rise_ctl_update_compaction_config_request::MutableConfig {
1358 mutable_config: Some(c.clone()),
1359 },
1360 )
1361 .collect(),
1362 };
1363 let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1364 Ok(())
1365 }
1366
1367 pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1368 let req = BackupMetaRequest { remarks };
1369 let resp = self.inner.backup_meta(req).await?;
1370 Ok(resp.job_id)
1371 }
1372
1373 pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1374 let req = GetBackupJobStatusRequest { job_id };
1375 let resp = self.inner.get_backup_job_status(req).await?;
1376 Ok((resp.job_status(), resp.message))
1377 }
1378
1379 pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1380 let req = DeleteMetaSnapshotRequest {
1381 snapshot_ids: snapshot_ids.to_vec(),
1382 };
1383 let _resp = self.inner.delete_meta_snapshot(req).await?;
1384 Ok(())
1385 }
1386
1387 pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1388 let req = GetMetaSnapshotManifestRequest {};
1389 let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1390 Ok(resp.manifest.expect("should exist"))
1391 }
1392
1393 pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1394 let req = GetTelemetryInfoRequest {};
1395 let resp = self.inner.get_telemetry_info(req).await?;
1396 Ok(resp)
1397 }
1398
1399 pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1400 let req = GetMetaStoreInfoRequest {};
1401 let resp = self.inner.get_meta_store_info(req).await?;
1402 Ok(resp.meta_store_endpoint)
1403 }
1404
1405 pub async fn alter_sink_props(
1406 &self,
1407 sink_id: u32,
1408 changed_props: BTreeMap<String, String>,
1409 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1410 connector_conn_ref: Option<u32>,
1411 ) -> Result<()> {
1412 let req = AlterConnectorPropsRequest {
1413 object_id: sink_id,
1414 changed_props: changed_props.into_iter().collect(),
1415 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1416 connector_conn_ref,
1417 object_type: AlterConnectorPropsObject::Sink as i32,
1418 };
1419 let _resp = self.inner.alter_connector_props(req).await?;
1420 Ok(())
1421 }
1422
1423 pub async fn alter_source_connector_props(
1424 &self,
1425 source_id: u32,
1426 changed_props: BTreeMap<String, String>,
1427 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1428 connector_conn_ref: Option<u32>,
1429 ) -> Result<()> {
1430 let req = AlterConnectorPropsRequest {
1431 object_id: source_id,
1432 changed_props: changed_props.into_iter().collect(),
1433 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1434 connector_conn_ref,
1435 object_type: AlterConnectorPropsObject::Source as i32,
1436 };
1437 let _resp = self.inner.alter_connector_props(req).await?;
1438 Ok(())
1439 }
1440
1441 pub async fn set_system_param(
1442 &self,
1443 param: String,
1444 value: Option<String>,
1445 ) -> Result<Option<SystemParamsReader>> {
1446 let req = SetSystemParamRequest { param, value };
1447 let resp = self.inner.set_system_param(req).await?;
1448 Ok(resp.params.map(SystemParamsReader::from))
1449 }
1450
1451 pub async fn get_session_params(&self) -> Result<String> {
1452 let req = GetSessionParamsRequest {};
1453 let resp = self.inner.get_session_params(req).await?;
1454 Ok(resp.params)
1455 }
1456
1457 pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1458 let req = SetSessionParamRequest { param, value };
1459 let resp = self.inner.set_session_param(req).await?;
1460 Ok(resp.param)
1461 }
1462
1463 pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1464 let req = GetDdlProgressRequest {};
1465 let resp = self.inner.get_ddl_progress(req).await?;
1466 Ok(resp.ddl_progress)
1467 }
1468
1469 pub async fn split_compaction_group(
1470 &self,
1471 group_id: CompactionGroupId,
1472 table_ids_to_new_group: &[StateTableId],
1473 partition_vnode_count: u32,
1474 ) -> Result<CompactionGroupId> {
1475 let req = SplitCompactionGroupRequest {
1476 group_id,
1477 table_ids: table_ids_to_new_group.to_vec(),
1478 partition_vnode_count,
1479 };
1480 let resp = self.inner.split_compaction_group(req).await?;
1481 Ok(resp.new_group_id)
1482 }
1483
1484 pub async fn get_tables(
1485 &self,
1486 table_ids: &[u32],
1487 include_dropped_tables: bool,
1488 ) -> Result<HashMap<u32, Table>> {
1489 let req = GetTablesRequest {
1490 table_ids: table_ids.to_vec(),
1491 include_dropped_tables,
1492 };
1493 let resp = self.inner.get_tables(req).await?;
1494 Ok(resp.tables)
1495 }
1496
1497 pub async fn list_serving_vnode_mappings(
1498 &self,
1499 ) -> Result<HashMap<u32, (u32, WorkerSlotMapping)>> {
1500 let req = GetServingVnodeMappingsRequest {};
1501 let resp = self.inner.get_serving_vnode_mappings(req).await?;
1502 let mappings = resp
1503 .worker_slot_mappings
1504 .into_iter()
1505 .map(|p| {
1506 (
1507 p.fragment_id,
1508 (
1509 resp.fragment_to_table
1510 .get(&p.fragment_id)
1511 .cloned()
1512 .unwrap_or(0),
1513 WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1514 ),
1515 )
1516 })
1517 .collect();
1518 Ok(mappings)
1519 }
1520
1521 pub async fn risectl_list_compaction_status(
1522 &self,
1523 ) -> Result<(
1524 Vec<CompactStatus>,
1525 Vec<CompactTaskAssignment>,
1526 Vec<CompactTaskProgress>,
1527 )> {
1528 let req = RiseCtlListCompactionStatusRequest {};
1529 let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1530 Ok((
1531 resp.compaction_statuses,
1532 resp.task_assignment,
1533 resp.task_progress,
1534 ))
1535 }
1536
1537 pub async fn get_compaction_score(
1538 &self,
1539 compaction_group_id: CompactionGroupId,
1540 ) -> Result<Vec<PickerInfo>> {
1541 let req = GetCompactionScoreRequest {
1542 compaction_group_id,
1543 };
1544 let resp = self.inner.get_compaction_score(req).await?;
1545 Ok(resp.scores)
1546 }
1547
1548 pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1549 let req = RiseCtlRebuildTableStatsRequest {};
1550 let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1551 Ok(())
1552 }
1553
1554 pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1555 let req = ListBranchedObjectRequest {};
1556 let resp = self.inner.list_branched_object(req).await?;
1557 Ok(resp.branched_objects)
1558 }
1559
1560 pub async fn list_active_write_limit(&self) -> Result<HashMap<u64, WriteLimit>> {
1561 let req = ListActiveWriteLimitRequest {};
1562 let resp = self.inner.list_active_write_limit(req).await?;
1563 Ok(resp.write_limits)
1564 }
1565
1566 pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1567 let req = ListHummockMetaConfigRequest {};
1568 let resp = self.inner.list_hummock_meta_config(req).await?;
1569 Ok(resp.configs)
1570 }
1571
1572 pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1573 let _resp = self
1574 .inner
1575 .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1576 .await?;
1577
1578 Ok(())
1579 }
1580
1581 pub async fn rw_cloud_validate_source(
1582 &self,
1583 source_type: SourceType,
1584 source_config: HashMap<String, String>,
1585 ) -> Result<RwCloudValidateSourceResponse> {
1586 let req = RwCloudValidateSourceRequest {
1587 source_type: source_type.into(),
1588 source_config,
1589 };
1590 let resp = self.inner.rw_cloud_validate_source(req).await?;
1591 Ok(resp)
1592 }
1593
1594 pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1595 self.inner.core.read().await.sink_coordinate_client.clone()
1596 }
1597
1598 pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1599 let req = ListCompactTaskAssignmentRequest {};
1600 let resp = self.inner.list_compact_task_assignment(req).await?;
1601 Ok(resp.task_assignment)
1602 }
1603
1604 pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1605 let req = ListEventLogRequest::default();
1606 let resp = self.inner.list_event_log(req).await?;
1607 Ok(resp.event_logs)
1608 }
1609
1610 pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1611 let req = ListCompactTaskProgressRequest {};
1612 let resp = self.inner.list_compact_task_progress(req).await?;
1613 Ok(resp.task_progress)
1614 }
1615
1616 #[cfg(madsim)]
1617 pub fn try_add_panic_event_blocking(
1618 &self,
1619 panic_info: impl Display,
1620 timeout_millis: Option<u64>,
1621 ) {
1622 }
1623
1624 #[cfg(not(madsim))]
1626 pub fn try_add_panic_event_blocking(
1627 &self,
1628 panic_info: impl Display,
1629 timeout_millis: Option<u64>,
1630 ) {
1631 let event = event_log::EventWorkerNodePanic {
1632 worker_id: self.worker_id,
1633 worker_type: self.worker_type.into(),
1634 host_addr: Some(self.host_addr.to_protobuf()),
1635 panic_info: format!("{panic_info}"),
1636 };
1637 let grpc_meta_client = self.inner.clone();
1638 let _ = thread::spawn(move || {
1639 let rt = tokio::runtime::Builder::new_current_thread()
1640 .enable_all()
1641 .build()
1642 .unwrap();
1643 let req = AddEventLogRequest {
1644 event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1645 };
1646 rt.block_on(async {
1647 let _ = tokio::time::timeout(
1648 Duration::from_millis(timeout_millis.unwrap_or(1000)),
1649 grpc_meta_client.add_event_log(req),
1650 )
1651 .await;
1652 });
1653 })
1654 .join();
1655 }
1656
1657 pub async fn add_sink_fail_evet(
1658 &self,
1659 sink_id: u32,
1660 sink_name: String,
1661 connector: String,
1662 error: String,
1663 ) -> Result<()> {
1664 let event = event_log::EventSinkFail {
1665 sink_id,
1666 sink_name,
1667 connector,
1668 error,
1669 };
1670 let req = AddEventLogRequest {
1671 event: Some(add_event_log_request::Event::SinkFail(event)),
1672 };
1673 self.inner.add_event_log(req).await?;
1674 Ok(())
1675 }
1676
1677 pub async fn add_cdc_auto_schema_change_fail_event(
1678 &self,
1679 table_id: u32,
1680 table_name: String,
1681 cdc_table_id: String,
1682 upstream_ddl: String,
1683 fail_info: String,
1684 ) -> Result<()> {
1685 let event = event_log::EventAutoSchemaChangeFail {
1686 table_id,
1687 table_name,
1688 cdc_table_id,
1689 upstream_ddl,
1690 fail_info,
1691 };
1692 let req = AddEventLogRequest {
1693 event: Some(add_event_log_request::Event::AutoSchemaChangeFail(event)),
1694 };
1695 self.inner.add_event_log(req).await?;
1696 Ok(())
1697 }
1698
1699 pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1700 let req = CancelCompactTaskRequest {
1701 task_id,
1702 task_status: task_status as _,
1703 };
1704 let resp = self.inner.cancel_compact_task(req).await?;
1705 Ok(resp.ret)
1706 }
1707
1708 pub async fn get_version_by_epoch(
1709 &self,
1710 epoch: HummockEpoch,
1711 table_id: u32,
1712 ) -> Result<PbHummockVersion> {
1713 let req = GetVersionByEpochRequest { epoch, table_id };
1714 let resp = self.inner.get_version_by_epoch(req).await?;
1715 Ok(resp.version.unwrap())
1716 }
1717
1718 pub async fn get_cluster_limits(
1719 &self,
1720 ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1721 let req = GetClusterLimitsRequest {};
1722 let resp = self.inner.get_cluster_limits(req).await?;
1723 Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1724 }
1725
1726 pub async fn merge_compaction_group(
1727 &self,
1728 left_group_id: CompactionGroupId,
1729 right_group_id: CompactionGroupId,
1730 ) -> Result<()> {
1731 let req = MergeCompactionGroupRequest {
1732 left_group_id,
1733 right_group_id,
1734 };
1735 self.inner.merge_compaction_group(req).await?;
1736 Ok(())
1737 }
1738
1739 pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1741 let request = ListRateLimitsRequest {};
1742 let resp = self.inner.list_rate_limits(request).await?;
1743 Ok(resp.rate_limits)
1744 }
1745
1746 pub async fn list_cdc_progress(&self) -> Result<HashMap<u32, PbCdcProgress>> {
1747 let request = ListCdcProgressRequest {};
1748 let resp = self.inner.list_cdc_progress(request).await?;
1749 Ok(resp.cdc_progress)
1750 }
1751
1752 pub async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
1753 let request = ListIcebergTablesRequest {};
1754 let resp = self.inner.list_iceberg_tables(request).await?;
1755 Ok(resp.iceberg_tables)
1756 }
1757
1758 pub async fn get_cluster_stack_trace(
1760 &self,
1761 actor_traces_format: ActorTracesFormat,
1762 ) -> Result<StackTraceResponse> {
1763 let request = StackTraceRequest {
1764 actor_traces_format: actor_traces_format as i32,
1765 };
1766 let resp = self.inner.stack_trace(request).await?;
1767 Ok(resp)
1768 }
1769
1770 pub async fn set_sync_log_store_aligned(&self, job_id: u32, aligned: bool) -> Result<()> {
1771 let request = SetSyncLogStoreAlignedRequest { job_id, aligned };
1772 self.inner.set_sync_log_store_aligned(request).await?;
1773 Ok(())
1774 }
1775
1776 pub async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
1777 self.inner.refresh(request).await
1778 }
1779}
1780
1781#[async_trait]
1782impl HummockMetaClient for MetaClient {
1783 async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
1784 let req = UnpinVersionBeforeRequest {
1785 context_id: self.worker_id(),
1786 unpin_version_before: unpin_version_before.to_u64(),
1787 };
1788 self.inner.unpin_version_before(req).await?;
1789 Ok(())
1790 }
1791
1792 async fn get_current_version(&self) -> Result<HummockVersion> {
1793 let req = GetCurrentVersionRequest::default();
1794 Ok(HummockVersion::from_rpc_protobuf(
1795 &self
1796 .inner
1797 .get_current_version(req)
1798 .await?
1799 .current_version
1800 .unwrap(),
1801 ))
1802 }
1803
1804 async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
1805 let resp = self
1806 .inner
1807 .get_new_object_ids(GetNewObjectIdsRequest { number })
1808 .await?;
1809 Ok(ObjectIdRange::new(resp.start_id, resp.end_id))
1810 }
1811
1812 async fn commit_epoch_with_change_log(
1813 &self,
1814 _epoch: HummockEpoch,
1815 _sync_result: SyncResult,
1816 _change_log_info: Option<HummockMetaClientChangeLogInfo>,
1817 ) -> Result<()> {
1818 panic!("Only meta service can commit_epoch in production.")
1819 }
1820
1821 async fn trigger_manual_compaction(
1822 &self,
1823 compaction_group_id: u64,
1824 table_id: u32,
1825 level: u32,
1826 sst_ids: Vec<u64>,
1827 ) -> Result<()> {
1828 let req = TriggerManualCompactionRequest {
1830 compaction_group_id,
1831 table_id,
1832 level,
1835 sst_ids,
1836 ..Default::default()
1837 };
1838
1839 self.inner.trigger_manual_compaction(req).await?;
1840 Ok(())
1841 }
1842
1843 async fn trigger_full_gc(
1844 &self,
1845 sst_retention_time_sec: u64,
1846 prefix: Option<String>,
1847 ) -> Result<()> {
1848 self.inner
1849 .trigger_full_gc(TriggerFullGcRequest {
1850 sst_retention_time_sec,
1851 prefix,
1852 })
1853 .await?;
1854 Ok(())
1855 }
1856
1857 async fn subscribe_compaction_event(
1858 &self,
1859 ) -> Result<(
1860 UnboundedSender<SubscribeCompactionEventRequest>,
1861 BoxStream<'static, CompactionEventItem>,
1862 )> {
1863 let (request_sender, request_receiver) =
1864 unbounded_channel::<SubscribeCompactionEventRequest>();
1865 request_sender
1866 .send(SubscribeCompactionEventRequest {
1867 event: Some(subscribe_compaction_event_request::Event::Register(
1868 Register {
1869 context_id: self.worker_id(),
1870 },
1871 )),
1872 create_at: SystemTime::now()
1873 .duration_since(SystemTime::UNIX_EPOCH)
1874 .expect("Clock may have gone backwards")
1875 .as_millis() as u64,
1876 })
1877 .context("Failed to subscribe compaction event")?;
1878
1879 let stream = self
1880 .inner
1881 .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
1882 request_receiver,
1883 )))
1884 .await?;
1885
1886 Ok((request_sender, Box::pin(stream)))
1887 }
1888
1889 async fn get_version_by_epoch(
1890 &self,
1891 epoch: HummockEpoch,
1892 table_id: u32,
1893 ) -> Result<PbHummockVersion> {
1894 self.get_version_by_epoch(epoch, table_id).await
1895 }
1896
1897 async fn subscribe_iceberg_compaction_event(
1898 &self,
1899 ) -> Result<(
1900 UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1901 BoxStream<'static, IcebergCompactionEventItem>,
1902 )> {
1903 let (request_sender, request_receiver) =
1904 unbounded_channel::<SubscribeIcebergCompactionEventRequest>();
1905 request_sender
1906 .send(SubscribeIcebergCompactionEventRequest {
1907 event: Some(subscribe_iceberg_compaction_event_request::Event::Register(
1908 IcebergRegister {
1909 context_id: self.worker_id(),
1910 },
1911 )),
1912 create_at: SystemTime::now()
1913 .duration_since(SystemTime::UNIX_EPOCH)
1914 .expect("Clock may have gone backwards")
1915 .as_millis() as u64,
1916 })
1917 .context("Failed to subscribe compaction event")?;
1918
1919 let stream = self
1920 .inner
1921 .subscribe_iceberg_compaction_event(Request::new(UnboundedReceiverStream::new(
1922 request_receiver,
1923 )))
1924 .await?;
1925
1926 Ok((request_sender, Box::pin(stream)))
1927 }
1928}
1929
1930#[async_trait]
1931impl TelemetryInfoFetcher for MetaClient {
1932 async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
1933 let resp = self
1934 .get_telemetry_info()
1935 .await
1936 .map_err(|e| e.to_report_string())?;
1937 let tracking_id = resp.get_tracking_id().ok();
1938 Ok(tracking_id.map(|id| id.to_owned()))
1939 }
1940}
1941
1942pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
1943
1944#[derive(Debug, Clone)]
1945struct GrpcMetaClientCore {
1946 cluster_client: ClusterServiceClient<Channel>,
1947 meta_member_client: MetaMemberServiceClient<Channel>,
1948 heartbeat_client: HeartbeatServiceClient<Channel>,
1949 ddl_client: DdlServiceClient<Channel>,
1950 hummock_client: HummockManagerServiceClient<Channel>,
1951 notification_client: NotificationServiceClient<Channel>,
1952 stream_client: StreamManagerServiceClient<Channel>,
1953 user_client: UserServiceClient<Channel>,
1954 scale_client: ScaleServiceClient<Channel>,
1955 backup_client: BackupServiceClient<Channel>,
1956 telemetry_client: TelemetryInfoServiceClient<Channel>,
1957 system_params_client: SystemParamsServiceClient<Channel>,
1958 session_params_client: SessionParamServiceClient<Channel>,
1959 serving_client: ServingServiceClient<Channel>,
1960 cloud_client: CloudServiceClient<Channel>,
1961 sink_coordinate_client: SinkCoordinationRpcClient,
1962 event_log_client: EventLogServiceClient<Channel>,
1963 cluster_limit_client: ClusterLimitServiceClient<Channel>,
1964 hosted_iceberg_catalog_service_client: HostedIcebergCatalogServiceClient<Channel>,
1965 monitor_client: MonitorServiceClient<Channel>,
1966}
1967
1968impl GrpcMetaClientCore {
1969 pub(crate) fn new(channel: Channel) -> Self {
1970 let cluster_client = ClusterServiceClient::new(channel.clone());
1971 let meta_member_client = MetaMemberClient::new(channel.clone());
1972 let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
1973 let ddl_client =
1974 DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1975 let hummock_client =
1976 HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1977 let notification_client =
1978 NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1979 let stream_client =
1980 StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1981 let user_client = UserServiceClient::new(channel.clone());
1982 let scale_client =
1983 ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1984 let backup_client = BackupServiceClient::new(channel.clone());
1985 let telemetry_client =
1986 TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1987 let system_params_client = SystemParamsServiceClient::new(channel.clone());
1988 let session_params_client = SessionParamServiceClient::new(channel.clone());
1989 let serving_client = ServingServiceClient::new(channel.clone());
1990 let cloud_client = CloudServiceClient::new(channel.clone());
1991 let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone());
1992 let event_log_client = EventLogServiceClient::new(channel.clone());
1993 let cluster_limit_client = ClusterLimitServiceClient::new(channel.clone());
1994 let hosted_iceberg_catalog_service_client =
1995 HostedIcebergCatalogServiceClient::new(channel.clone());
1996 let monitor_client = MonitorServiceClient::new(channel);
1997
1998 GrpcMetaClientCore {
1999 cluster_client,
2000 meta_member_client,
2001 heartbeat_client,
2002 ddl_client,
2003 hummock_client,
2004 notification_client,
2005 stream_client,
2006 user_client,
2007 scale_client,
2008 backup_client,
2009 telemetry_client,
2010 system_params_client,
2011 session_params_client,
2012 serving_client,
2013 cloud_client,
2014 sink_coordinate_client,
2015 event_log_client,
2016 cluster_limit_client,
2017 hosted_iceberg_catalog_service_client,
2018 monitor_client,
2019 }
2020 }
2021}
2022
2023#[derive(Debug, Clone)]
2027struct GrpcMetaClient {
2028 member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
2029 core: Arc<RwLock<GrpcMetaClientCore>>,
2030}
2031
2032type MetaMemberClient = MetaMemberServiceClient<Channel>;
2033
2034struct MetaMemberGroup {
2035 members: LruCache<http::Uri, Option<MetaMemberClient>>,
2036}
2037
2038struct MetaMemberManagement {
2039 core_ref: Arc<RwLock<GrpcMetaClientCore>>,
2040 members: Either<MetaMemberClient, MetaMemberGroup>,
2041 current_leader: http::Uri,
2042 meta_config: MetaConfig,
2043}
2044
2045impl MetaMemberManagement {
2046 const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
2047
2048 fn host_address_to_uri(addr: HostAddress) -> http::Uri {
2049 format!("http://{}:{}", addr.host, addr.port)
2050 .parse()
2051 .unwrap()
2052 }
2053
2054 async fn recreate_core(&self, channel: Channel) {
2055 let mut core = self.core_ref.write().await;
2056 *core = GrpcMetaClientCore::new(channel);
2057 }
2058
2059 async fn refresh_members(&mut self) -> Result<()> {
2060 let leader_addr = match self.members.as_mut() {
2061 Either::Left(client) => {
2062 let resp = client
2063 .to_owned()
2064 .members(MembersRequest {})
2065 .await
2066 .map_err(RpcError::from_meta_status)?;
2067 let resp = resp.into_inner();
2068 resp.members.into_iter().find(|member| member.is_leader)
2069 }
2070 Either::Right(member_group) => {
2071 let mut fetched_members = None;
2072
2073 for (addr, client) in &mut member_group.members {
2074 let members: Result<_> = try {
2075 let mut client = match client {
2076 Some(cached_client) => cached_client.to_owned(),
2077 None => {
2078 let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
2079 let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
2080 .await
2081 .context("failed to create client")?;
2082 let new_client: MetaMemberClient =
2083 MetaMemberServiceClient::new(channel);
2084 *client = Some(new_client.clone());
2085
2086 new_client
2087 }
2088 };
2089
2090 let resp = client
2091 .members(MembersRequest {})
2092 .await
2093 .context("failed to fetch members")?;
2094
2095 resp.into_inner().members
2096 };
2097
2098 let fetched = members.is_ok();
2099 fetched_members = Some(members);
2100 if fetched {
2101 break;
2102 }
2103 }
2104
2105 let members = fetched_members
2106 .context("no member available in the list")?
2107 .context("could not refresh members")?;
2108
2109 let mut leader = None;
2111 for member in members {
2112 if member.is_leader {
2113 leader = Some(member.clone());
2114 }
2115
2116 let addr = Self::host_address_to_uri(member.address.unwrap());
2117 if !member_group.members.contains(&addr) {
2119 tracing::info!("new meta member joined: {}", addr);
2120 member_group.members.put(addr, None);
2121 }
2122 }
2123
2124 leader
2125 }
2126 };
2127
2128 if let Some(leader) = leader_addr {
2129 let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
2130
2131 if discovered_leader != self.current_leader {
2132 tracing::info!("new meta leader {} discovered", discovered_leader);
2133
2134 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
2135 Duration::from_secs(self.meta_config.meta_leader_lease_secs),
2136 false,
2137 );
2138
2139 let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
2140 let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
2141 GrpcMetaClient::connect_to_endpoint(endpoint).await
2142 })
2143 .await?;
2144
2145 self.recreate_core(channel).await;
2146 self.current_leader = discovered_leader;
2147 }
2148 }
2149
2150 Ok(())
2151 }
2152}
2153
2154impl GrpcMetaClient {
2155 const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
2157 const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
2159 const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
2161 const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
2163
2164 fn start_meta_member_monitor(
2165 &self,
2166 init_leader_addr: http::Uri,
2167 members: Either<MetaMemberClient, MetaMemberGroup>,
2168 force_refresh_receiver: Receiver<Sender<Result<()>>>,
2169 meta_config: MetaConfig,
2170 ) -> Result<()> {
2171 let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
2172 let current_leader = init_leader_addr;
2173
2174 let enable_period_tick = matches!(members, Either::Right(_));
2175
2176 let member_management = MetaMemberManagement {
2177 core_ref,
2178 members,
2179 current_leader,
2180 meta_config,
2181 };
2182
2183 let mut force_refresh_receiver = force_refresh_receiver;
2184
2185 tokio::spawn(async move {
2186 let mut member_management = member_management;
2187 let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
2188
2189 loop {
2190 let event: Option<Sender<Result<()>>> = if enable_period_tick {
2191 tokio::select! {
2192 _ = ticker.tick() => None,
2193 result_sender = force_refresh_receiver.recv() => {
2194 if result_sender.is_none() {
2195 break;
2196 }
2197
2198 result_sender
2199 },
2200 }
2201 } else {
2202 let result_sender = force_refresh_receiver.recv().await;
2203
2204 if result_sender.is_none() {
2205 break;
2206 }
2207
2208 result_sender
2209 };
2210
2211 let tick_result = member_management.refresh_members().await;
2212 if let Err(e) = tick_result.as_ref() {
2213 tracing::warn!(error = %e.as_report(), "refresh meta member client failed");
2214 }
2215
2216 if let Some(sender) = event {
2217 let _resp = sender.send(tick_result);
2219 }
2220 }
2221 });
2222
2223 Ok(())
2224 }
2225
2226 async fn force_refresh_leader(&self) -> Result<()> {
2227 let (sender, receiver) = oneshot::channel();
2228
2229 self.member_monitor_event_sender
2230 .send(sender)
2231 .await
2232 .map_err(|e| anyhow!(e))?;
2233
2234 receiver.await.map_err(|e| anyhow!(e))?
2235 }
2236
2237 pub async fn new(strategy: &MetaAddressStrategy, config: MetaConfig) -> Result<Self> {
2239 let (channel, addr) = match strategy {
2240 MetaAddressStrategy::LoadBalance(addr) => {
2241 Self::try_build_rpc_channel(vec![addr.clone()]).await
2242 }
2243 MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
2244 }?;
2245 let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
2246 let client = GrpcMetaClient {
2247 member_monitor_event_sender: force_refresh_sender,
2248 core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
2249 };
2250
2251 let meta_member_client = client.core.read().await.meta_member_client.clone();
2252 let members = match strategy {
2253 MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
2254 MetaAddressStrategy::List(addrs) => {
2255 let mut members = LruCache::new(20);
2256 for addr in addrs {
2257 members.put(addr.clone(), None);
2258 }
2259 members.put(addr.clone(), Some(meta_member_client));
2260
2261 Either::Right(MetaMemberGroup { members })
2262 }
2263 };
2264
2265 client.start_meta_member_monitor(addr, members, force_refresh_receiver, config)?;
2266
2267 client.force_refresh_leader().await?;
2268
2269 Ok(client)
2270 }
2271
2272 fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2273 Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2274 }
2275
2276 pub(crate) async fn try_build_rpc_channel(
2277 addrs: impl IntoIterator<Item = http::Uri>,
2278 ) -> Result<(Channel, http::Uri)> {
2279 let endpoints: Vec<_> = addrs
2280 .into_iter()
2281 .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2282 .collect();
2283
2284 let mut last_error = None;
2285
2286 for (endpoint, addr) in endpoints {
2287 match Self::connect_to_endpoint(endpoint).await {
2288 Ok(channel) => {
2289 tracing::info!("Connect to meta server {} successfully", addr);
2290 return Ok((channel, addr));
2291 }
2292 Err(e) => {
2293 tracing::warn!(
2294 error = %e.as_report(),
2295 "Failed to connect to meta server {}, trying again",
2296 addr,
2297 );
2298 last_error = Some(e);
2299 }
2300 }
2301 }
2302
2303 if let Some(last_error) = last_error {
2304 Err(anyhow::anyhow!(last_error)
2305 .context("failed to connect to all meta servers")
2306 .into())
2307 } else {
2308 bail!("no meta server address provided")
2309 }
2310 }
2311
2312 async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2313 let channel = endpoint
2314 .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2315 .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2316 .connect_timeout(Duration::from_secs(5))
2317 .monitored_connect("grpc-meta-client", Default::default())
2318 .await?
2319 .wrapped();
2320
2321 Ok(channel)
2322 }
2323
2324 pub(crate) fn retry_strategy_to_bound(
2325 high_bound: Duration,
2326 exceed: bool,
2327 ) -> impl Iterator<Item = Duration> {
2328 let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2329 .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2330 .map(jitter);
2331
2332 let mut sum = Duration::default();
2333
2334 iter.take_while(move |duration| {
2335 sum += *duration;
2336
2337 if exceed {
2338 sum < high_bound + *duration
2339 } else {
2340 sum < high_bound
2341 }
2342 })
2343 }
2344}
2345
2346macro_rules! for_all_meta_rpc {
2347 ($macro:ident) => {
2348 $macro! {
2349 { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2350 ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2351 ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2352 ,{ cluster_client, update_worker_node_schedulability, UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse }
2353 ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2354 ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2355 ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2356 ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2357 ,{ stream_client, flush, FlushRequest, FlushResponse }
2358 ,{ stream_client, pause, PauseRequest, PauseResponse }
2359 ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2360 ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2361 ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2362 ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2363 ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2364 ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2365 ,{ stream_client, list_creating_fragment_distribution, ListCreatingFragmentDistributionRequest, ListCreatingFragmentDistributionResponse }
2366 ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2367 ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2368 ,{ stream_client, list_object_dependencies, ListObjectDependenciesRequest, ListObjectDependenciesResponse }
2369 ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2370 ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2371 ,{ stream_client, list_cdc_progress, ListCdcProgressRequest, ListCdcProgressResponse }
2372 ,{ stream_client, alter_connector_props, AlterConnectorPropsRequest, AlterConnectorPropsResponse }
2373 ,{ stream_client, get_fragment_by_id, GetFragmentByIdRequest, GetFragmentByIdResponse }
2374 ,{ stream_client, set_sync_log_store_aligned, SetSyncLogStoreAlignedRequest, SetSyncLogStoreAlignedResponse }
2375 ,{ stream_client, refresh, RefreshRequest, RefreshResponse }
2376 ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2377 ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2378 ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2379 ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2380 ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2381 ,{ ddl_client, alter_cdc_table_backfill_parallelism, AlterCdcTableBackfillParallelismRequest, AlterCdcTableBackfillParallelismResponse }
2382 ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2383 ,{ ddl_client, alter_database_param, AlterDatabaseParamRequest, AlterDatabaseParamResponse }
2384 ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2385 ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2386 ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2387 ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2388 ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2389 ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2390 ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2391 ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2392 ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2393 ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2394 ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2395 ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2396 ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2397 ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2398 ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2399 ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2400 ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2401 ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2402 ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2403 ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2404 ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2405 ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2406 ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2407 ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2408 ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2409 ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2410 ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2411 ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2412 ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2413 ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2414 ,{ ddl_client, wait, WaitRequest, WaitResponse }
2415 ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2416 ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2417 ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2418 ,{ ddl_client, compact_iceberg_table, CompactIcebergTableRequest, CompactIcebergTableResponse }
2419 ,{ ddl_client, expire_iceberg_table_snapshots, ExpireIcebergTableSnapshotsRequest, ExpireIcebergTableSnapshotsResponse }
2420 ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2421 ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2422 ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2423 ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2424 ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2425 ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2426 ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2427 ,{ hummock_client, get_new_object_ids, GetNewObjectIdsRequest, GetNewObjectIdsResponse }
2428 ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2429 ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2430 ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2431 ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2432 ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2433 ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2434 ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2435 ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2436 ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2437 ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2438 ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2439 ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2440 ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2441 ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2442 ,{ hummock_client, subscribe_iceberg_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeIcebergCompactionEventRequest>, Streaming<SubscribeIcebergCompactionEventResponse> }
2443 ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2444 ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2445 ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2446 ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2447 ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2448 ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2449 ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2450 ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2451 ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2452 ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2453 ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2454 ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2455 ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2456 ,{ user_client, alter_default_privilege, AlterDefaultPrivilegeRequest, AlterDefaultPrivilegeResponse }
2457 ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2458 ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2459 ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2460 ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2461 ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2462 ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2463 ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2464 ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2465 ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2466 ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2467 ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2468 ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2469 ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2470 ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2471 ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2472 ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2473 ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2474 ,{ hosted_iceberg_catalog_service_client, list_iceberg_tables, ListIcebergTablesRequest, ListIcebergTablesResponse }
2475 ,{ monitor_client, stack_trace, StackTraceRequest, StackTraceResponse }
2476 }
2477 };
2478}
2479
2480impl GrpcMetaClient {
2481 async fn refresh_client_if_needed(&self, code: Code) {
2482 if matches!(
2483 code,
2484 Code::Unknown | Code::Unimplemented | Code::Unavailable
2485 ) {
2486 tracing::debug!("matching tonic code {}", code);
2487 let (result_sender, result_receiver) = oneshot::channel();
2488 if self
2489 .member_monitor_event_sender
2490 .try_send(result_sender)
2491 .is_ok()
2492 {
2493 if let Ok(Err(e)) = result_receiver.await {
2494 tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2495 }
2496 } else {
2497 tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2498 }
2499 }
2500 }
2501}
2502
2503impl GrpcMetaClient {
2504 for_all_meta_rpc! { meta_rpc_client_method_impl }
2505}