1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{
33 AlterDatabaseParam, FunctionId, IndexId, ObjectId, SecretId, TableId,
34};
35use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
36use risingwave_common::hash::WorkerSlotMapping;
37use risingwave_common::monitor::EndpointExt;
38use risingwave_common::system_param::reader::SystemParamsReader;
39use risingwave_common::telemetry::report::TelemetryInfoFetcher;
40use risingwave_common::util::addr::HostAddr;
41use risingwave_common::util::meta_addr::MetaAddressStrategy;
42use risingwave_common::util::resource_util::cpu::total_cpu_available;
43use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
44use risingwave_error::bail;
45use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
46use risingwave_hummock_sdk::compaction_group::StateTableId;
47use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
48use risingwave_hummock_sdk::{
49 CompactionGroupId, HummockEpoch, HummockVersionId, ObjectIdRange, SyncResult,
50};
51use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
52use risingwave_pb::backup_service::*;
53use risingwave_pb::catalog::{
54 Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
55 PbSubscription, PbTable, PbView, Table,
56};
57use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
58use risingwave_pb::cloud_service::*;
59use risingwave_pb::common::worker_node::Property;
60use risingwave_pb::common::{HostAddress, OptionalUint32, OptionalUint64, WorkerNode, WorkerType};
61use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
62use risingwave_pb::ddl_service::alter_owner_request::Object;
63use risingwave_pb::ddl_service::create_materialized_view_request::PbBackfillType;
64use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
65use risingwave_pb::ddl_service::drop_table_request::SourceId;
66use risingwave_pb::ddl_service::*;
67use risingwave_pb::hummock::compact_task::TaskStatus;
68use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
69use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
70use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
71use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
72use risingwave_pb::hummock::write_limits::WriteLimit;
73use risingwave_pb::hummock::*;
74use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Register as IcebergRegister;
75use risingwave_pb::iceberg_compaction::{
76 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
77 subscribe_iceberg_compaction_event_request,
78};
79use risingwave_pb::meta::alter_connector_props_request::AlterConnectorPropsObject;
80use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
81use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
82use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
83use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
84use risingwave_pb::meta::hosted_iceberg_catalog_service_client::HostedIcebergCatalogServiceClient;
85use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
86use risingwave_pb::meta::list_actor_states_response::ActorState;
87use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
88use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
89use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
90use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
91use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
92use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
93use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
94use risingwave_pb::meta::serving_service_client::ServingServiceClient;
95use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
96use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
97use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
98use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
99use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
100use risingwave_pb::meta::{FragmentDistribution, *};
101use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
102use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
103use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
104use risingwave_pb::secret::PbSecretRef;
105use risingwave_pb::stream_plan::StreamFragmentGraph;
106use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
107use risingwave_pb::user::update_user_request::UpdateField;
108use risingwave_pb::user::user_service_client::UserServiceClient;
109use risingwave_pb::user::*;
110use thiserror_ext::AsReport;
111use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
112use tokio::sync::oneshot::Sender;
113use tokio::sync::{RwLock, mpsc, oneshot};
114use tokio::task::JoinHandle;
115use tokio::time::{self};
116use tokio_retry::strategy::{ExponentialBackoff, jitter};
117use tokio_stream::wrappers::UnboundedReceiverStream;
118use tonic::transport::Endpoint;
119use tonic::{Code, Request, Streaming};
120
121use crate::channel::{Channel, WrappedChannelExt};
122use crate::error::{Result, RpcError};
123use crate::hummock_meta_client::{
124 CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
125 IcebergCompactionEventItem,
126};
127use crate::meta_rpc_client_method_impl;
128
129type ConnectionId = u32;
130type DatabaseId = u32;
131type SchemaId = u32;
132
133#[derive(Clone, Debug)]
135pub struct MetaClient {
136 worker_id: u32,
137 worker_type: WorkerType,
138 host_addr: HostAddr,
139 inner: GrpcMetaClient,
140 meta_config: MetaConfig,
141 cluster_id: String,
142 shutting_down: Arc<AtomicBool>,
143}
144
145impl MetaClient {
146 pub fn worker_id(&self) -> u32 {
147 self.worker_id
148 }
149
150 pub fn host_addr(&self) -> &HostAddr {
151 &self.host_addr
152 }
153
154 pub fn worker_type(&self) -> WorkerType {
155 self.worker_type
156 }
157
158 pub fn cluster_id(&self) -> &str {
159 &self.cluster_id
160 }
161
162 pub async fn subscribe(
164 &self,
165 subscribe_type: SubscribeType,
166 ) -> Result<Streaming<SubscribeResponse>> {
167 let request = SubscribeRequest {
168 subscribe_type: subscribe_type as i32,
169 host: Some(self.host_addr.to_protobuf()),
170 worker_id: self.worker_id(),
171 };
172
173 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
174 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
175 true,
176 );
177
178 tokio_retry::Retry::spawn(retry_strategy, || async {
179 let request = request.clone();
180 self.inner.subscribe(request).await
181 })
182 .await
183 }
184
185 pub async fn create_connection(
186 &self,
187 connection_name: String,
188 database_id: u32,
189 schema_id: u32,
190 owner_id: u32,
191 req: create_connection_request::Payload,
192 ) -> Result<WaitVersion> {
193 let request = CreateConnectionRequest {
194 name: connection_name,
195 database_id,
196 schema_id,
197 owner_id,
198 payload: Some(req),
199 };
200 let resp = self.inner.create_connection(request).await?;
201 Ok(resp
202 .version
203 .ok_or_else(|| anyhow!("wait version not set"))?)
204 }
205
206 pub async fn create_secret(
207 &self,
208 secret_name: String,
209 database_id: u32,
210 schema_id: u32,
211 owner_id: u32,
212 value: Vec<u8>,
213 ) -> Result<WaitVersion> {
214 let request = CreateSecretRequest {
215 name: secret_name,
216 database_id,
217 schema_id,
218 owner_id,
219 value,
220 };
221 let resp = self.inner.create_secret(request).await?;
222 Ok(resp
223 .version
224 .ok_or_else(|| anyhow!("wait version not set"))?)
225 }
226
227 pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
228 let request = ListConnectionsRequest {};
229 let resp = self.inner.list_connections(request).await?;
230 Ok(resp.connections)
231 }
232
233 pub async fn drop_connection(
234 &self,
235 connection_id: ConnectionId,
236 cascade: bool,
237 ) -> Result<WaitVersion> {
238 let request = DropConnectionRequest {
239 connection_id,
240 cascade,
241 };
242 let resp = self.inner.drop_connection(request).await?;
243 Ok(resp
244 .version
245 .ok_or_else(|| anyhow!("wait version not set"))?)
246 }
247
248 pub async fn drop_secret(&self, secret_id: SecretId) -> Result<WaitVersion> {
249 let request = DropSecretRequest {
250 secret_id: secret_id.into(),
251 };
252 let resp = self.inner.drop_secret(request).await?;
253 Ok(resp
254 .version
255 .ok_or_else(|| anyhow!("wait version not set"))?)
256 }
257
258 pub async fn register_new(
262 addr_strategy: MetaAddressStrategy,
263 worker_type: WorkerType,
264 addr: &HostAddr,
265 property: Property,
266 meta_config: &MetaConfig,
267 ) -> (Self, SystemParamsReader) {
268 let ret =
269 Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
270
271 match ret {
272 Ok(ret) => ret,
273 Err(err) => {
274 tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
275 std::process::exit(1);
276 }
277 }
278 }
279
280 async fn register_new_inner(
281 addr_strategy: MetaAddressStrategy,
282 worker_type: WorkerType,
283 addr: &HostAddr,
284 property: Property,
285 meta_config: &MetaConfig,
286 ) -> Result<(Self, SystemParamsReader)> {
287 tracing::info!("register meta client using strategy: {}", addr_strategy);
288
289 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
291 Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
292 true,
293 );
294
295 if property.is_unschedulable {
296 tracing::warn!("worker {:?} registered as unschedulable", addr.clone());
297 }
298 let init_result: Result<_> = tokio_retry::RetryIf::spawn(
299 retry_strategy,
300 || async {
301 let grpc_meta_client =
302 GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
303
304 let add_worker_resp = grpc_meta_client
305 .add_worker_node(AddWorkerNodeRequest {
306 worker_type: worker_type as i32,
307 host: Some(addr.to_protobuf()),
308 property: Some(property.clone()),
309 resource: Some(risingwave_pb::common::worker_node::Resource {
310 rw_version: RW_VERSION.to_owned(),
311 total_memory_bytes: system_memory_available_bytes() as _,
312 total_cpu_cores: total_cpu_available() as _,
313 }),
314 })
315 .await
316 .context("failed to add worker node")?;
317
318 let system_params_resp = grpc_meta_client
319 .get_system_params(GetSystemParamsRequest {})
320 .await
321 .context("failed to get initial system params")?;
322
323 Ok((add_worker_resp, system_params_resp, grpc_meta_client))
324 },
325 |e: &RpcError| !e.is_from_tonic_server_impl(),
328 )
329 .await;
330
331 let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
332 let worker_id = add_worker_resp
333 .node_id
334 .expect("AddWorkerNodeResponse::node_id is empty");
335
336 let meta_client = Self {
337 worker_id,
338 worker_type,
339 host_addr: addr.clone(),
340 inner: grpc_meta_client,
341 meta_config: meta_config.to_owned(),
342 cluster_id: add_worker_resp.cluster_id,
343 shutting_down: Arc::new(false.into()),
344 };
345
346 static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
347 REPORT_PANIC.call_once(|| {
348 let meta_client_clone = meta_client.clone();
349 std::panic::update_hook(move |default_hook, info| {
350 meta_client_clone.try_add_panic_event_blocking(info, None);
352 default_hook(info);
353 });
354 });
355
356 Ok((meta_client, system_params_resp.params.unwrap().into()))
357 }
358
359 pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
361 let request = ActivateWorkerNodeRequest {
362 host: Some(addr.to_protobuf()),
363 node_id: self.worker_id,
364 };
365 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
366 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
367 true,
368 );
369 tokio_retry::Retry::spawn(retry_strategy, || async {
370 let request = request.clone();
371 self.inner.activate_worker_node(request).await
372 })
373 .await?;
374
375 Ok(())
376 }
377
378 pub async fn send_heartbeat(&self, node_id: u32) -> Result<()> {
380 let request = HeartbeatRequest { node_id };
381 let resp = self.inner.heartbeat(request).await?;
382 if let Some(status) = resp.status
383 && status.code() == risingwave_pb::common::status::Code::UnknownWorker
384 {
385 if !self.shutting_down.load(Relaxed) {
388 tracing::error!(message = status.message, "worker expired");
389 std::process::exit(1);
390 }
391 }
392 Ok(())
393 }
394
395 pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
396 let request = CreateDatabaseRequest { db: Some(db) };
397 let resp = self.inner.create_database(request).await?;
398 Ok(resp
400 .version
401 .ok_or_else(|| anyhow!("wait version not set"))?)
402 }
403
404 pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
405 let request = CreateSchemaRequest {
406 schema: Some(schema),
407 };
408 let resp = self.inner.create_schema(request).await?;
409 Ok(resp
411 .version
412 .ok_or_else(|| anyhow!("wait version not set"))?)
413 }
414
415 pub async fn create_materialized_view(
416 &self,
417 table: PbTable,
418 graph: StreamFragmentGraph,
419 dependencies: HashSet<ObjectId>,
420 specific_resource_group: Option<String>,
421 if_not_exists: bool,
422 ) -> Result<WaitVersion> {
423 let request = CreateMaterializedViewRequest {
424 materialized_view: Some(table),
425 fragment_graph: Some(graph),
426 backfill: PbBackfillType::Regular as _,
427 dependencies: dependencies.into_iter().collect(),
428 specific_resource_group,
429 if_not_exists,
430 };
431 let resp = self.inner.create_materialized_view(request).await?;
432 Ok(resp
434 .version
435 .ok_or_else(|| anyhow!("wait version not set"))?)
436 }
437
438 pub async fn drop_materialized_view(
439 &self,
440 table_id: TableId,
441 cascade: bool,
442 ) -> Result<WaitVersion> {
443 let request = DropMaterializedViewRequest {
444 table_id: table_id.table_id(),
445 cascade,
446 };
447
448 let resp = self.inner.drop_materialized_view(request).await?;
449 Ok(resp
450 .version
451 .ok_or_else(|| anyhow!("wait version not set"))?)
452 }
453
454 pub async fn create_source(
455 &self,
456 source: PbSource,
457 graph: Option<StreamFragmentGraph>,
458 if_not_exists: bool,
459 ) -> Result<WaitVersion> {
460 let request = CreateSourceRequest {
461 source: Some(source),
462 fragment_graph: graph,
463 if_not_exists,
464 };
465
466 let resp = self.inner.create_source(request).await?;
467 Ok(resp
468 .version
469 .ok_or_else(|| anyhow!("wait version not set"))?)
470 }
471
472 pub async fn create_sink(
473 &self,
474 sink: PbSink,
475 graph: StreamFragmentGraph,
476 affected_table_change: Option<ReplaceJobPlan>,
477 dependencies: HashSet<ObjectId>,
478 if_not_exists: bool,
479 ) -> Result<WaitVersion> {
480 let request = CreateSinkRequest {
481 sink: Some(sink),
482 fragment_graph: Some(graph),
483 affected_table_change,
484 dependencies: dependencies.into_iter().collect(),
485 if_not_exists,
486 };
487
488 let resp = self.inner.create_sink(request).await?;
489 Ok(resp
490 .version
491 .ok_or_else(|| anyhow!("wait version not set"))?)
492 }
493
494 pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
495 let request = CreateSubscriptionRequest {
496 subscription: Some(subscription),
497 };
498
499 let resp = self.inner.create_subscription(request).await?;
500 Ok(resp
501 .version
502 .ok_or_else(|| anyhow!("wait version not set"))?)
503 }
504
505 pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
506 let request = CreateFunctionRequest {
507 function: Some(function),
508 };
509 let resp = self.inner.create_function(request).await?;
510 Ok(resp
511 .version
512 .ok_or_else(|| anyhow!("wait version not set"))?)
513 }
514
515 pub async fn create_table(
516 &self,
517 source: Option<PbSource>,
518 table: PbTable,
519 graph: StreamFragmentGraph,
520 job_type: PbTableJobType,
521 if_not_exists: bool,
522 dependencies: HashSet<ObjectId>,
523 ) -> Result<WaitVersion> {
524 let request = CreateTableRequest {
525 materialized_view: Some(table),
526 fragment_graph: Some(graph),
527 source,
528 job_type: job_type as _,
529 if_not_exists,
530 dependencies: dependencies.into_iter().collect(),
531 };
532 let resp = self.inner.create_table(request).await?;
533 Ok(resp
535 .version
536 .ok_or_else(|| anyhow!("wait version not set"))?)
537 }
538
539 pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
540 let request = CommentOnRequest {
541 comment: Some(comment),
542 };
543 let resp = self.inner.comment_on(request).await?;
544 Ok(resp
545 .version
546 .ok_or_else(|| anyhow!("wait version not set"))?)
547 }
548
549 pub async fn alter_name(
550 &self,
551 object: alter_name_request::Object,
552 name: &str,
553 ) -> Result<WaitVersion> {
554 let request = AlterNameRequest {
555 object: Some(object),
556 new_name: name.to_owned(),
557 };
558 let resp = self.inner.alter_name(request).await?;
559 Ok(resp
560 .version
561 .ok_or_else(|| anyhow!("wait version not set"))?)
562 }
563
564 pub async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<WaitVersion> {
565 let request = AlterOwnerRequest {
566 object: Some(object),
567 owner_id,
568 };
569 let resp = self.inner.alter_owner(request).await?;
570 Ok(resp
571 .version
572 .ok_or_else(|| anyhow!("wait version not set"))?)
573 }
574
575 pub async fn alter_database_param(
576 &self,
577 database_id: DatabaseId,
578 param: AlterDatabaseParam,
579 ) -> Result<WaitVersion> {
580 let request = match param {
581 AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms) => {
582 let barrier_interval_ms = OptionalUint32 {
583 value: barrier_interval_ms,
584 };
585 AlterDatabaseParamRequest {
586 database_id,
587 param: Some(alter_database_param_request::Param::BarrierIntervalMs(
588 barrier_interval_ms,
589 )),
590 }
591 }
592 AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency) => {
593 let checkpoint_frequency = OptionalUint64 {
594 value: checkpoint_frequency,
595 };
596 AlterDatabaseParamRequest {
597 database_id,
598 param: Some(alter_database_param_request::Param::CheckpointFrequency(
599 checkpoint_frequency,
600 )),
601 }
602 }
603 };
604 let resp = self.inner.alter_database_param(request).await?;
605 Ok(resp
606 .version
607 .ok_or_else(|| anyhow!("wait version not set"))?)
608 }
609
610 pub async fn alter_set_schema(
611 &self,
612 object: alter_set_schema_request::Object,
613 new_schema_id: u32,
614 ) -> Result<WaitVersion> {
615 let request = AlterSetSchemaRequest {
616 new_schema_id,
617 object: Some(object),
618 };
619 let resp = self.inner.alter_set_schema(request).await?;
620 Ok(resp
621 .version
622 .ok_or_else(|| anyhow!("wait version not set"))?)
623 }
624
625 pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
626 let request = AlterSourceRequest {
627 source: Some(source),
628 };
629 let resp = self.inner.alter_source(request).await?;
630 Ok(resp
631 .version
632 .ok_or_else(|| anyhow!("wait version not set"))?)
633 }
634
635 pub async fn alter_parallelism(
636 &self,
637 table_id: u32,
638 parallelism: PbTableParallelism,
639 deferred: bool,
640 ) -> Result<()> {
641 let request = AlterParallelismRequest {
642 table_id,
643 parallelism: Some(parallelism),
644 deferred,
645 };
646
647 self.inner.alter_parallelism(request).await?;
648 Ok(())
649 }
650
651 pub async fn alter_cdc_table_backfill_parallelism(
652 &self,
653 table_id: u32,
654 parallelism: PbTableParallelism,
655 ) -> Result<()> {
656 let request = AlterCdcTableBackfillParallelismRequest {
657 table_id,
658 parallelism: Some(parallelism),
659 };
660 self.inner
661 .alter_cdc_table_backfill_parallelism(request)
662 .await?;
663 Ok(())
664 }
665
666 pub async fn alter_resource_group(
667 &self,
668 table_id: u32,
669 resource_group: Option<String>,
670 deferred: bool,
671 ) -> Result<()> {
672 let request = AlterResourceGroupRequest {
673 table_id,
674 resource_group,
675 deferred,
676 };
677
678 self.inner.alter_resource_group(request).await?;
679 Ok(())
680 }
681
682 pub async fn alter_swap_rename(
683 &self,
684 object: alter_swap_rename_request::Object,
685 ) -> Result<WaitVersion> {
686 let request = AlterSwapRenameRequest {
687 object: Some(object),
688 };
689 let resp = self.inner.alter_swap_rename(request).await?;
690 Ok(resp
691 .version
692 .ok_or_else(|| anyhow!("wait version not set"))?)
693 }
694
695 pub async fn alter_secret(
696 &self,
697 secret_id: u32,
698 secret_name: String,
699 database_id: u32,
700 schema_id: u32,
701 owner_id: u32,
702 value: Vec<u8>,
703 ) -> Result<WaitVersion> {
704 let request = AlterSecretRequest {
705 secret_id,
706 name: secret_name,
707 database_id,
708 schema_id,
709 owner_id,
710 value,
711 };
712 let resp = self.inner.alter_secret(request).await?;
713 Ok(resp
714 .version
715 .ok_or_else(|| anyhow!("wait version not set"))?)
716 }
717
718 pub async fn replace_job(
719 &self,
720 graph: StreamFragmentGraph,
721 replace_job: ReplaceJob,
722 ) -> Result<WaitVersion> {
723 let request = ReplaceJobPlanRequest {
724 plan: Some(ReplaceJobPlan {
725 fragment_graph: Some(graph),
726 replace_job: Some(replace_job),
727 }),
728 };
729 let resp = self.inner.replace_job_plan(request).await?;
730 Ok(resp
732 .version
733 .ok_or_else(|| anyhow!("wait version not set"))?)
734 }
735
736 pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
737 let request = AutoSchemaChangeRequest {
738 schema_change: Some(schema_change),
739 };
740 let _ = self.inner.auto_schema_change(request).await?;
741 Ok(())
742 }
743
744 pub async fn create_view(
745 &self,
746 view: PbView,
747 dependencies: HashSet<ObjectId>,
748 ) -> Result<WaitVersion> {
749 let request = CreateViewRequest {
750 view: Some(view),
751 dependencies: dependencies.into_iter().collect(),
752 };
753 let resp = self.inner.create_view(request).await?;
754 Ok(resp
756 .version
757 .ok_or_else(|| anyhow!("wait version not set"))?)
758 }
759
760 pub async fn create_index(
761 &self,
762 index: PbIndex,
763 table: PbTable,
764 graph: StreamFragmentGraph,
765 if_not_exists: bool,
766 ) -> Result<WaitVersion> {
767 let request = CreateIndexRequest {
768 index: Some(index),
769 index_table: Some(table),
770 fragment_graph: Some(graph),
771 if_not_exists,
772 };
773 let resp = self.inner.create_index(request).await?;
774 Ok(resp
776 .version
777 .ok_or_else(|| anyhow!("wait version not set"))?)
778 }
779
780 pub async fn drop_table(
781 &self,
782 source_id: Option<u32>,
783 table_id: TableId,
784 cascade: bool,
785 ) -> Result<WaitVersion> {
786 let request = DropTableRequest {
787 source_id: source_id.map(SourceId::Id),
788 table_id: table_id.table_id(),
789 cascade,
790 };
791
792 let resp = self.inner.drop_table(request).await?;
793 Ok(resp
794 .version
795 .ok_or_else(|| anyhow!("wait version not set"))?)
796 }
797
798 pub async fn compact_iceberg_table(&self, sink_id: u32) -> Result<u64> {
799 let request = CompactIcebergTableRequest { sink_id };
800 let resp = self.inner.compact_iceberg_table(request).await?;
801 Ok(resp.task_id)
802 }
803
804 pub async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<WaitVersion> {
805 let request = DropViewRequest { view_id, cascade };
806 let resp = self.inner.drop_view(request).await?;
807 Ok(resp
808 .version
809 .ok_or_else(|| anyhow!("wait version not set"))?)
810 }
811
812 pub async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<WaitVersion> {
813 let request = DropSourceRequest { source_id, cascade };
814 let resp = self.inner.drop_source(request).await?;
815 Ok(resp
816 .version
817 .ok_or_else(|| anyhow!("wait version not set"))?)
818 }
819
820 pub async fn drop_sink(
821 &self,
822 sink_id: u32,
823 cascade: bool,
824 affected_table_change: Option<ReplaceJobPlan>,
825 ) -> Result<WaitVersion> {
826 let request = DropSinkRequest {
827 sink_id,
828 cascade,
829 affected_table_change,
830 };
831 let resp = self.inner.drop_sink(request).await?;
832 Ok(resp
833 .version
834 .ok_or_else(|| anyhow!("wait version not set"))?)
835 }
836
837 pub async fn drop_subscription(
838 &self,
839 subscription_id: u32,
840 cascade: bool,
841 ) -> Result<WaitVersion> {
842 let request = DropSubscriptionRequest {
843 subscription_id,
844 cascade,
845 };
846 let resp = self.inner.drop_subscription(request).await?;
847 Ok(resp
848 .version
849 .ok_or_else(|| anyhow!("wait version not set"))?)
850 }
851
852 pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
853 let request = DropIndexRequest {
854 index_id: index_id.index_id,
855 cascade,
856 };
857 let resp = self.inner.drop_index(request).await?;
858 Ok(resp
859 .version
860 .ok_or_else(|| anyhow!("wait version not set"))?)
861 }
862
863 pub async fn drop_function(&self, function_id: FunctionId) -> Result<WaitVersion> {
864 let request = DropFunctionRequest {
865 function_id: function_id.0,
866 };
867 let resp = self.inner.drop_function(request).await?;
868 Ok(resp
869 .version
870 .ok_or_else(|| anyhow!("wait version not set"))?)
871 }
872
873 pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
874 let request = DropDatabaseRequest { database_id };
875 let resp = self.inner.drop_database(request).await?;
876 Ok(resp
877 .version
878 .ok_or_else(|| anyhow!("wait version not set"))?)
879 }
880
881 pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
882 let request = DropSchemaRequest { schema_id, cascade };
883 let resp = self.inner.drop_schema(request).await?;
884 Ok(resp
885 .version
886 .ok_or_else(|| anyhow!("wait version not set"))?)
887 }
888
889 pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
891 let request = CreateUserRequest { user: Some(user) };
892 let resp = self.inner.create_user(request).await?;
893 Ok(resp.version)
894 }
895
896 pub async fn drop_user(&self, user_id: u32) -> Result<u64> {
897 let request = DropUserRequest { user_id };
898 let resp = self.inner.drop_user(request).await?;
899 Ok(resp.version)
900 }
901
902 pub async fn update_user(
903 &self,
904 user: UserInfo,
905 update_fields: Vec<UpdateField>,
906 ) -> Result<u64> {
907 let request = UpdateUserRequest {
908 user: Some(user),
909 update_fields: update_fields
910 .into_iter()
911 .map(|field| field as i32)
912 .collect::<Vec<_>>(),
913 };
914 let resp = self.inner.update_user(request).await?;
915 Ok(resp.version)
916 }
917
918 pub async fn grant_privilege(
919 &self,
920 user_ids: Vec<u32>,
921 privileges: Vec<GrantPrivilege>,
922 with_grant_option: bool,
923 granted_by: u32,
924 ) -> Result<u64> {
925 let request = GrantPrivilegeRequest {
926 user_ids,
927 privileges,
928 with_grant_option,
929 granted_by,
930 };
931 let resp = self.inner.grant_privilege(request).await?;
932 Ok(resp.version)
933 }
934
935 pub async fn revoke_privilege(
936 &self,
937 user_ids: Vec<u32>,
938 privileges: Vec<GrantPrivilege>,
939 granted_by: u32,
940 revoke_by: u32,
941 revoke_grant_option: bool,
942 cascade: bool,
943 ) -> Result<u64> {
944 let request = RevokePrivilegeRequest {
945 user_ids,
946 privileges,
947 granted_by,
948 revoke_by,
949 revoke_grant_option,
950 cascade,
951 };
952 let resp = self.inner.revoke_privilege(request).await?;
953 Ok(resp.version)
954 }
955
956 pub async fn alter_default_privilege(
957 &self,
958 user_ids: Vec<u32>,
959 database_id: DatabaseId,
960 schema_ids: Vec<SchemaId>,
961 operation: AlterDefaultPrivilegeOperation,
962 granted_by: u32,
963 ) -> Result<()> {
964 let request = AlterDefaultPrivilegeRequest {
965 user_ids,
966 database_id,
967 schema_ids,
968 operation: Some(operation),
969 granted_by,
970 };
971 self.inner.alter_default_privilege(request).await?;
972 Ok(())
973 }
974
975 pub async fn unregister(&self) -> Result<()> {
977 let request = DeleteWorkerNodeRequest {
978 host: Some(self.host_addr.to_protobuf()),
979 };
980 self.inner.delete_worker_node(request).await?;
981 self.shutting_down.store(true, Relaxed);
982 Ok(())
983 }
984
985 pub async fn try_unregister(&self) {
987 match self.unregister().await {
988 Ok(_) => {
989 tracing::info!(
990 worker_id = self.worker_id(),
991 "successfully unregistered from meta service",
992 )
993 }
994 Err(e) => {
995 tracing::warn!(
996 error = %e.as_report(),
997 worker_id = self.worker_id(),
998 "failed to unregister from meta service",
999 );
1000 }
1001 }
1002 }
1003
1004 pub async fn update_schedulability(
1005 &self,
1006 worker_ids: &[u32],
1007 schedulability: Schedulability,
1008 ) -> Result<UpdateWorkerNodeSchedulabilityResponse> {
1009 let request = UpdateWorkerNodeSchedulabilityRequest {
1010 worker_ids: worker_ids.to_vec(),
1011 schedulability: schedulability.into(),
1012 };
1013 let resp = self
1014 .inner
1015 .update_worker_node_schedulability(request)
1016 .await?;
1017 Ok(resp)
1018 }
1019
1020 pub async fn list_worker_nodes(
1021 &self,
1022 worker_type: Option<WorkerType>,
1023 ) -> Result<Vec<WorkerNode>> {
1024 let request = ListAllNodesRequest {
1025 worker_type: worker_type.map(Into::into),
1026 include_starting_nodes: true,
1027 };
1028 let resp = self.inner.list_all_nodes(request).await?;
1029 Ok(resp.nodes)
1030 }
1031
1032 pub fn start_heartbeat_loop(
1034 meta_client: MetaClient,
1035 min_interval: Duration,
1036 ) -> (JoinHandle<()>, Sender<()>) {
1037 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1038 let join_handle = tokio::spawn(async move {
1039 let mut min_interval_ticker = tokio::time::interval(min_interval);
1040 loop {
1041 tokio::select! {
1042 biased;
1043 _ = &mut shutdown_rx => {
1045 tracing::info!("Heartbeat loop is stopped");
1046 return;
1047 }
1048 _ = min_interval_ticker.tick() => {},
1050 }
1051 tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
1052 match tokio::time::timeout(
1053 min_interval * 3,
1055 meta_client.send_heartbeat(meta_client.worker_id()),
1056 )
1057 .await
1058 {
1059 Ok(Ok(_)) => {}
1060 Ok(Err(err)) => {
1061 tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
1062 }
1063 Err(_) => {
1064 tracing::warn!("Failed to send_heartbeat: timeout");
1065 }
1066 }
1067 }
1068 });
1069 (join_handle, shutdown_tx)
1070 }
1071
1072 pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
1073 let request = RisectlListStateTablesRequest {};
1074 let resp = self.inner.risectl_list_state_tables(request).await?;
1075 Ok(resp.tables)
1076 }
1077
1078 pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
1079 let request = FlushRequest { database_id };
1080 let resp = self.inner.flush(request).await?;
1081 Ok(HummockVersionId::new(resp.hummock_version_id))
1082 }
1083
1084 pub async fn wait(&self) -> Result<()> {
1085 let request = WaitRequest {};
1086 self.inner.wait(request).await?;
1087 Ok(())
1088 }
1089
1090 pub async fn recover(&self) -> Result<()> {
1091 let request = RecoverRequest {};
1092 self.inner.recover(request).await?;
1093 Ok(())
1094 }
1095
1096 pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
1097 let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
1098 let resp = self.inner.cancel_creating_jobs(request).await?;
1099 Ok(resp.canceled_jobs)
1100 }
1101
1102 pub async fn list_table_fragments(
1103 &self,
1104 table_ids: &[u32],
1105 ) -> Result<HashMap<u32, TableFragmentInfo>> {
1106 let request = ListTableFragmentsRequest {
1107 table_ids: table_ids.to_vec(),
1108 };
1109 let resp = self.inner.list_table_fragments(request).await?;
1110 Ok(resp.table_fragments)
1111 }
1112
1113 pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1114 let resp = self
1115 .inner
1116 .list_streaming_job_states(ListStreamingJobStatesRequest {})
1117 .await?;
1118 Ok(resp.states)
1119 }
1120
1121 pub async fn list_fragment_distributions(&self) -> Result<Vec<FragmentDistribution>> {
1122 let resp = self
1123 .inner
1124 .list_fragment_distribution(ListFragmentDistributionRequest {})
1125 .await?;
1126 Ok(resp.distributions)
1127 }
1128
1129 pub async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
1130 let resp = self
1131 .inner
1132 .list_creating_fragment_distribution(ListCreatingFragmentDistributionRequest {})
1133 .await?;
1134 Ok(resp.distributions)
1135 }
1136
1137 pub async fn get_fragment_by_id(
1138 &self,
1139 fragment_id: u32,
1140 ) -> Result<Option<FragmentDistribution>> {
1141 let resp = self
1142 .inner
1143 .get_fragment_by_id(GetFragmentByIdRequest { fragment_id })
1144 .await?;
1145 Ok(resp.distribution)
1146 }
1147
1148 pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1149 let resp = self
1150 .inner
1151 .list_actor_states(ListActorStatesRequest {})
1152 .await?;
1153 Ok(resp.states)
1154 }
1155
1156 pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1157 let resp = self
1158 .inner
1159 .list_actor_splits(ListActorSplitsRequest {})
1160 .await?;
1161
1162 Ok(resp.actor_splits)
1163 }
1164
1165 pub async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
1166 let resp = self
1167 .inner
1168 .list_object_dependencies(ListObjectDependenciesRequest {})
1169 .await?;
1170 Ok(resp.dependencies)
1171 }
1172
1173 pub async fn pause(&self) -> Result<PauseResponse> {
1174 let request = PauseRequest {};
1175 let resp = self.inner.pause(request).await?;
1176 Ok(resp)
1177 }
1178
1179 pub async fn resume(&self) -> Result<ResumeResponse> {
1180 let request = ResumeRequest {};
1181 let resp = self.inner.resume(request).await?;
1182 Ok(resp)
1183 }
1184
1185 pub async fn apply_throttle(
1186 &self,
1187 kind: PbThrottleTarget,
1188 id: u32,
1189 rate: Option<u32>,
1190 ) -> Result<ApplyThrottleResponse> {
1191 let request = ApplyThrottleRequest {
1192 kind: kind as i32,
1193 id,
1194 rate,
1195 };
1196 let resp = self.inner.apply_throttle(request).await?;
1197 Ok(resp)
1198 }
1199
1200 pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1201 let resp = self
1202 .inner
1203 .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1204 .await?;
1205 Ok(resp.get_status().unwrap())
1206 }
1207
1208 pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1209 let request = GetClusterInfoRequest {};
1210 let resp = self.inner.get_cluster_info(request).await?;
1211 Ok(resp)
1212 }
1213
1214 pub async fn reschedule(
1215 &self,
1216 worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1217 revision: u64,
1218 resolve_no_shuffle_upstream: bool,
1219 ) -> Result<(bool, u64)> {
1220 let request = RescheduleRequest {
1221 revision,
1222 resolve_no_shuffle_upstream,
1223 worker_reschedules,
1224 };
1225 let resp = self.inner.reschedule(request).await?;
1226 Ok((resp.success, resp.revision))
1227 }
1228
1229 pub async fn risectl_get_pinned_versions_summary(
1230 &self,
1231 ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1232 let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1233 self.inner
1234 .rise_ctl_get_pinned_versions_summary(request)
1235 .await
1236 }
1237
1238 pub async fn risectl_get_checkpoint_hummock_version(
1239 &self,
1240 ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1241 let request = RiseCtlGetCheckpointVersionRequest {};
1242 self.inner.rise_ctl_get_checkpoint_version(request).await
1243 }
1244
1245 pub async fn risectl_pause_hummock_version_checkpoint(
1246 &self,
1247 ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1248 let request = RiseCtlPauseVersionCheckpointRequest {};
1249 self.inner.rise_ctl_pause_version_checkpoint(request).await
1250 }
1251
1252 pub async fn risectl_resume_hummock_version_checkpoint(
1253 &self,
1254 ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1255 let request = RiseCtlResumeVersionCheckpointRequest {};
1256 self.inner.rise_ctl_resume_version_checkpoint(request).await
1257 }
1258
1259 pub async fn init_metadata_for_replay(
1260 &self,
1261 tables: Vec<PbTable>,
1262 compaction_groups: Vec<CompactionGroupInfo>,
1263 ) -> Result<()> {
1264 let req = InitMetadataForReplayRequest {
1265 tables,
1266 compaction_groups,
1267 };
1268 let _resp = self.inner.init_metadata_for_replay(req).await?;
1269 Ok(())
1270 }
1271
1272 pub async fn replay_version_delta(
1273 &self,
1274 version_delta: HummockVersionDelta,
1275 ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1276 let req = ReplayVersionDeltaRequest {
1277 version_delta: Some(version_delta.into()),
1278 };
1279 let resp = self.inner.replay_version_delta(req).await?;
1280 Ok((
1281 HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1282 resp.modified_compaction_groups,
1283 ))
1284 }
1285
1286 pub async fn list_version_deltas(
1287 &self,
1288 start_id: HummockVersionId,
1289 num_limit: u32,
1290 committed_epoch_limit: HummockEpoch,
1291 ) -> Result<Vec<HummockVersionDelta>> {
1292 let req = ListVersionDeltasRequest {
1293 start_id: start_id.to_u64(),
1294 num_limit,
1295 committed_epoch_limit,
1296 };
1297 Ok(self
1298 .inner
1299 .list_version_deltas(req)
1300 .await?
1301 .version_deltas
1302 .unwrap()
1303 .version_deltas
1304 .iter()
1305 .map(HummockVersionDelta::from_rpc_protobuf)
1306 .collect())
1307 }
1308
1309 pub async fn trigger_compaction_deterministic(
1310 &self,
1311 version_id: HummockVersionId,
1312 compaction_groups: Vec<CompactionGroupId>,
1313 ) -> Result<()> {
1314 let req = TriggerCompactionDeterministicRequest {
1315 version_id: version_id.to_u64(),
1316 compaction_groups,
1317 };
1318 self.inner.trigger_compaction_deterministic(req).await?;
1319 Ok(())
1320 }
1321
1322 pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1323 let req = DisableCommitEpochRequest {};
1324 Ok(HummockVersion::from_rpc_protobuf(
1325 &self
1326 .inner
1327 .disable_commit_epoch(req)
1328 .await?
1329 .current_version
1330 .unwrap(),
1331 ))
1332 }
1333
1334 pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1335 let req = GetAssignedCompactTaskNumRequest {};
1336 let resp = self.inner.get_assigned_compact_task_num(req).await?;
1337 Ok(resp.num_tasks as usize)
1338 }
1339
1340 pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1341 let req = RiseCtlListCompactionGroupRequest {};
1342 let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1343 Ok(resp.compaction_groups)
1344 }
1345
1346 pub async fn risectl_update_compaction_config(
1347 &self,
1348 compaction_groups: &[CompactionGroupId],
1349 configs: &[MutableConfig],
1350 ) -> Result<()> {
1351 let req = RiseCtlUpdateCompactionConfigRequest {
1352 compaction_group_ids: compaction_groups.to_vec(),
1353 configs: configs
1354 .iter()
1355 .map(
1356 |c| rise_ctl_update_compaction_config_request::MutableConfig {
1357 mutable_config: Some(c.clone()),
1358 },
1359 )
1360 .collect(),
1361 };
1362 let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1363 Ok(())
1364 }
1365
1366 pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1367 let req = BackupMetaRequest { remarks };
1368 let resp = self.inner.backup_meta(req).await?;
1369 Ok(resp.job_id)
1370 }
1371
1372 pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1373 let req = GetBackupJobStatusRequest { job_id };
1374 let resp = self.inner.get_backup_job_status(req).await?;
1375 Ok((resp.job_status(), resp.message))
1376 }
1377
1378 pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1379 let req = DeleteMetaSnapshotRequest {
1380 snapshot_ids: snapshot_ids.to_vec(),
1381 };
1382 let _resp = self.inner.delete_meta_snapshot(req).await?;
1383 Ok(())
1384 }
1385
1386 pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1387 let req = GetMetaSnapshotManifestRequest {};
1388 let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1389 Ok(resp.manifest.expect("should exist"))
1390 }
1391
1392 pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1393 let req = GetTelemetryInfoRequest {};
1394 let resp = self.inner.get_telemetry_info(req).await?;
1395 Ok(resp)
1396 }
1397
1398 pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1399 let req = GetMetaStoreInfoRequest {};
1400 let resp = self.inner.get_meta_store_info(req).await?;
1401 Ok(resp.meta_store_endpoint)
1402 }
1403
1404 pub async fn alter_sink_props(
1405 &self,
1406 sink_id: u32,
1407 changed_props: BTreeMap<String, String>,
1408 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1409 connector_conn_ref: Option<u32>,
1410 ) -> Result<()> {
1411 let req = AlterConnectorPropsRequest {
1412 object_id: sink_id,
1413 changed_props: changed_props.into_iter().collect(),
1414 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1415 connector_conn_ref,
1416 object_type: AlterConnectorPropsObject::Sink as i32,
1417 };
1418 let _resp = self.inner.alter_connector_props(req).await?;
1419 Ok(())
1420 }
1421
1422 pub async fn alter_source_connector_props(
1423 &self,
1424 source_id: u32,
1425 changed_props: BTreeMap<String, String>,
1426 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1427 connector_conn_ref: Option<u32>,
1428 ) -> Result<()> {
1429 let req = AlterConnectorPropsRequest {
1430 object_id: source_id,
1431 changed_props: changed_props.into_iter().collect(),
1432 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1433 connector_conn_ref,
1434 object_type: AlterConnectorPropsObject::Source as i32,
1435 };
1436 let _resp = self.inner.alter_connector_props(req).await?;
1437 Ok(())
1438 }
1439
1440 pub async fn set_system_param(
1441 &self,
1442 param: String,
1443 value: Option<String>,
1444 ) -> Result<Option<SystemParamsReader>> {
1445 let req = SetSystemParamRequest { param, value };
1446 let resp = self.inner.set_system_param(req).await?;
1447 Ok(resp.params.map(SystemParamsReader::from))
1448 }
1449
1450 pub async fn get_session_params(&self) -> Result<String> {
1451 let req = GetSessionParamsRequest {};
1452 let resp = self.inner.get_session_params(req).await?;
1453 Ok(resp.params)
1454 }
1455
1456 pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1457 let req = SetSessionParamRequest { param, value };
1458 let resp = self.inner.set_session_param(req).await?;
1459 Ok(resp.param)
1460 }
1461
1462 pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1463 let req = GetDdlProgressRequest {};
1464 let resp = self.inner.get_ddl_progress(req).await?;
1465 Ok(resp.ddl_progress)
1466 }
1467
1468 pub async fn split_compaction_group(
1469 &self,
1470 group_id: CompactionGroupId,
1471 table_ids_to_new_group: &[StateTableId],
1472 partition_vnode_count: u32,
1473 ) -> Result<CompactionGroupId> {
1474 let req = SplitCompactionGroupRequest {
1475 group_id,
1476 table_ids: table_ids_to_new_group.to_vec(),
1477 partition_vnode_count,
1478 };
1479 let resp = self.inner.split_compaction_group(req).await?;
1480 Ok(resp.new_group_id)
1481 }
1482
1483 pub async fn get_tables(
1484 &self,
1485 table_ids: &[u32],
1486 include_dropped_tables: bool,
1487 ) -> Result<HashMap<u32, Table>> {
1488 let req = GetTablesRequest {
1489 table_ids: table_ids.to_vec(),
1490 include_dropped_tables,
1491 };
1492 let resp = self.inner.get_tables(req).await?;
1493 Ok(resp.tables)
1494 }
1495
1496 pub async fn list_serving_vnode_mappings(
1497 &self,
1498 ) -> Result<HashMap<u32, (u32, WorkerSlotMapping)>> {
1499 let req = GetServingVnodeMappingsRequest {};
1500 let resp = self.inner.get_serving_vnode_mappings(req).await?;
1501 let mappings = resp
1502 .worker_slot_mappings
1503 .into_iter()
1504 .map(|p| {
1505 (
1506 p.fragment_id,
1507 (
1508 resp.fragment_to_table
1509 .get(&p.fragment_id)
1510 .cloned()
1511 .unwrap_or(0),
1512 WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1513 ),
1514 )
1515 })
1516 .collect();
1517 Ok(mappings)
1518 }
1519
1520 pub async fn risectl_list_compaction_status(
1521 &self,
1522 ) -> Result<(
1523 Vec<CompactStatus>,
1524 Vec<CompactTaskAssignment>,
1525 Vec<CompactTaskProgress>,
1526 )> {
1527 let req = RiseCtlListCompactionStatusRequest {};
1528 let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1529 Ok((
1530 resp.compaction_statuses,
1531 resp.task_assignment,
1532 resp.task_progress,
1533 ))
1534 }
1535
1536 pub async fn get_compaction_score(
1537 &self,
1538 compaction_group_id: CompactionGroupId,
1539 ) -> Result<Vec<PickerInfo>> {
1540 let req = GetCompactionScoreRequest {
1541 compaction_group_id,
1542 };
1543 let resp = self.inner.get_compaction_score(req).await?;
1544 Ok(resp.scores)
1545 }
1546
1547 pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1548 let req = RiseCtlRebuildTableStatsRequest {};
1549 let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1550 Ok(())
1551 }
1552
1553 pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1554 let req = ListBranchedObjectRequest {};
1555 let resp = self.inner.list_branched_object(req).await?;
1556 Ok(resp.branched_objects)
1557 }
1558
1559 pub async fn list_active_write_limit(&self) -> Result<HashMap<u64, WriteLimit>> {
1560 let req = ListActiveWriteLimitRequest {};
1561 let resp = self.inner.list_active_write_limit(req).await?;
1562 Ok(resp.write_limits)
1563 }
1564
1565 pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1566 let req = ListHummockMetaConfigRequest {};
1567 let resp = self.inner.list_hummock_meta_config(req).await?;
1568 Ok(resp.configs)
1569 }
1570
1571 pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1572 let _resp = self
1573 .inner
1574 .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1575 .await?;
1576
1577 Ok(())
1578 }
1579
1580 pub async fn rw_cloud_validate_source(
1581 &self,
1582 source_type: SourceType,
1583 source_config: HashMap<String, String>,
1584 ) -> Result<RwCloudValidateSourceResponse> {
1585 let req = RwCloudValidateSourceRequest {
1586 source_type: source_type.into(),
1587 source_config,
1588 };
1589 let resp = self.inner.rw_cloud_validate_source(req).await?;
1590 Ok(resp)
1591 }
1592
1593 pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1594 self.inner.core.read().await.sink_coordinate_client.clone()
1595 }
1596
1597 pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1598 let req = ListCompactTaskAssignmentRequest {};
1599 let resp = self.inner.list_compact_task_assignment(req).await?;
1600 Ok(resp.task_assignment)
1601 }
1602
1603 pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1604 let req = ListEventLogRequest::default();
1605 let resp = self.inner.list_event_log(req).await?;
1606 Ok(resp.event_logs)
1607 }
1608
1609 pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1610 let req = ListCompactTaskProgressRequest {};
1611 let resp = self.inner.list_compact_task_progress(req).await?;
1612 Ok(resp.task_progress)
1613 }
1614
1615 #[cfg(madsim)]
1616 pub fn try_add_panic_event_blocking(
1617 &self,
1618 panic_info: impl Display,
1619 timeout_millis: Option<u64>,
1620 ) {
1621 }
1622
1623 #[cfg(not(madsim))]
1625 pub fn try_add_panic_event_blocking(
1626 &self,
1627 panic_info: impl Display,
1628 timeout_millis: Option<u64>,
1629 ) {
1630 let event = event_log::EventWorkerNodePanic {
1631 worker_id: self.worker_id,
1632 worker_type: self.worker_type.into(),
1633 host_addr: Some(self.host_addr.to_protobuf()),
1634 panic_info: format!("{panic_info}"),
1635 };
1636 let grpc_meta_client = self.inner.clone();
1637 let _ = thread::spawn(move || {
1638 let rt = tokio::runtime::Builder::new_current_thread()
1639 .enable_all()
1640 .build()
1641 .unwrap();
1642 let req = AddEventLogRequest {
1643 event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1644 };
1645 rt.block_on(async {
1646 let _ = tokio::time::timeout(
1647 Duration::from_millis(timeout_millis.unwrap_or(1000)),
1648 grpc_meta_client.add_event_log(req),
1649 )
1650 .await;
1651 });
1652 })
1653 .join();
1654 }
1655
1656 pub async fn add_sink_fail_evet(
1657 &self,
1658 sink_id: u32,
1659 sink_name: String,
1660 connector: String,
1661 error: String,
1662 ) -> Result<()> {
1663 let event = event_log::EventSinkFail {
1664 sink_id,
1665 sink_name,
1666 connector,
1667 error,
1668 };
1669 let req = AddEventLogRequest {
1670 event: Some(add_event_log_request::Event::SinkFail(event)),
1671 };
1672 self.inner.add_event_log(req).await?;
1673 Ok(())
1674 }
1675
1676 pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1677 let req = CancelCompactTaskRequest {
1678 task_id,
1679 task_status: task_status as _,
1680 };
1681 let resp = self.inner.cancel_compact_task(req).await?;
1682 Ok(resp.ret)
1683 }
1684
1685 pub async fn get_version_by_epoch(
1686 &self,
1687 epoch: HummockEpoch,
1688 table_id: u32,
1689 ) -> Result<PbHummockVersion> {
1690 let req = GetVersionByEpochRequest { epoch, table_id };
1691 let resp = self.inner.get_version_by_epoch(req).await?;
1692 Ok(resp.version.unwrap())
1693 }
1694
1695 pub async fn get_cluster_limits(
1696 &self,
1697 ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1698 let req = GetClusterLimitsRequest {};
1699 let resp = self.inner.get_cluster_limits(req).await?;
1700 Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1701 }
1702
1703 pub async fn merge_compaction_group(
1704 &self,
1705 left_group_id: CompactionGroupId,
1706 right_group_id: CompactionGroupId,
1707 ) -> Result<()> {
1708 let req = MergeCompactionGroupRequest {
1709 left_group_id,
1710 right_group_id,
1711 };
1712 self.inner.merge_compaction_group(req).await?;
1713 Ok(())
1714 }
1715
1716 pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1718 let request = ListRateLimitsRequest {};
1719 let resp = self.inner.list_rate_limits(request).await?;
1720 Ok(resp.rate_limits)
1721 }
1722
1723 pub async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
1724 let request = ListIcebergTablesRequest {};
1725 let resp = self.inner.list_iceberg_tables(request).await?;
1726 Ok(resp.iceberg_tables)
1727 }
1728
1729 pub async fn get_cluster_stack_trace(
1731 &self,
1732 actor_traces_format: ActorTracesFormat,
1733 ) -> Result<StackTraceResponse> {
1734 let request = StackTraceRequest {
1735 actor_traces_format: actor_traces_format as i32,
1736 };
1737 let resp = self.inner.stack_trace(request).await?;
1738 Ok(resp)
1739 }
1740
1741 pub async fn set_sync_log_store_aligned(&self, job_id: u32, aligned: bool) -> Result<()> {
1742 let request = SetSyncLogStoreAlignedRequest { job_id, aligned };
1743 self.inner.set_sync_log_store_aligned(request).await?;
1744 Ok(())
1745 }
1746
1747 pub async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
1748 self.inner.refresh(request).await
1749 }
1750}
1751
1752#[async_trait]
1753impl HummockMetaClient for MetaClient {
1754 async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
1755 let req = UnpinVersionBeforeRequest {
1756 context_id: self.worker_id(),
1757 unpin_version_before: unpin_version_before.to_u64(),
1758 };
1759 self.inner.unpin_version_before(req).await?;
1760 Ok(())
1761 }
1762
1763 async fn get_current_version(&self) -> Result<HummockVersion> {
1764 let req = GetCurrentVersionRequest::default();
1765 Ok(HummockVersion::from_rpc_protobuf(
1766 &self
1767 .inner
1768 .get_current_version(req)
1769 .await?
1770 .current_version
1771 .unwrap(),
1772 ))
1773 }
1774
1775 async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
1776 let resp = self
1777 .inner
1778 .get_new_object_ids(GetNewObjectIdsRequest { number })
1779 .await?;
1780 Ok(ObjectIdRange::new(resp.start_id, resp.end_id))
1781 }
1782
1783 async fn commit_epoch_with_change_log(
1784 &self,
1785 _epoch: HummockEpoch,
1786 _sync_result: SyncResult,
1787 _change_log_info: Option<HummockMetaClientChangeLogInfo>,
1788 ) -> Result<()> {
1789 panic!("Only meta service can commit_epoch in production.")
1790 }
1791
1792 async fn trigger_manual_compaction(
1793 &self,
1794 compaction_group_id: u64,
1795 table_id: u32,
1796 level: u32,
1797 sst_ids: Vec<u64>,
1798 ) -> Result<()> {
1799 let req = TriggerManualCompactionRequest {
1801 compaction_group_id,
1802 table_id,
1803 level,
1806 sst_ids,
1807 ..Default::default()
1808 };
1809
1810 self.inner.trigger_manual_compaction(req).await?;
1811 Ok(())
1812 }
1813
1814 async fn trigger_full_gc(
1815 &self,
1816 sst_retention_time_sec: u64,
1817 prefix: Option<String>,
1818 ) -> Result<()> {
1819 self.inner
1820 .trigger_full_gc(TriggerFullGcRequest {
1821 sst_retention_time_sec,
1822 prefix,
1823 })
1824 .await?;
1825 Ok(())
1826 }
1827
1828 async fn subscribe_compaction_event(
1829 &self,
1830 ) -> Result<(
1831 UnboundedSender<SubscribeCompactionEventRequest>,
1832 BoxStream<'static, CompactionEventItem>,
1833 )> {
1834 let (request_sender, request_receiver) =
1835 unbounded_channel::<SubscribeCompactionEventRequest>();
1836 request_sender
1837 .send(SubscribeCompactionEventRequest {
1838 event: Some(subscribe_compaction_event_request::Event::Register(
1839 Register {
1840 context_id: self.worker_id(),
1841 },
1842 )),
1843 create_at: SystemTime::now()
1844 .duration_since(SystemTime::UNIX_EPOCH)
1845 .expect("Clock may have gone backwards")
1846 .as_millis() as u64,
1847 })
1848 .context("Failed to subscribe compaction event")?;
1849
1850 let stream = self
1851 .inner
1852 .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
1853 request_receiver,
1854 )))
1855 .await?;
1856
1857 Ok((request_sender, Box::pin(stream)))
1858 }
1859
1860 async fn get_version_by_epoch(
1861 &self,
1862 epoch: HummockEpoch,
1863 table_id: u32,
1864 ) -> Result<PbHummockVersion> {
1865 self.get_version_by_epoch(epoch, table_id).await
1866 }
1867
1868 async fn subscribe_iceberg_compaction_event(
1869 &self,
1870 ) -> Result<(
1871 UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1872 BoxStream<'static, IcebergCompactionEventItem>,
1873 )> {
1874 let (request_sender, request_receiver) =
1875 unbounded_channel::<SubscribeIcebergCompactionEventRequest>();
1876 request_sender
1877 .send(SubscribeIcebergCompactionEventRequest {
1878 event: Some(subscribe_iceberg_compaction_event_request::Event::Register(
1879 IcebergRegister {
1880 context_id: self.worker_id(),
1881 },
1882 )),
1883 create_at: SystemTime::now()
1884 .duration_since(SystemTime::UNIX_EPOCH)
1885 .expect("Clock may have gone backwards")
1886 .as_millis() as u64,
1887 })
1888 .context("Failed to subscribe compaction event")?;
1889
1890 let stream = self
1891 .inner
1892 .subscribe_iceberg_compaction_event(Request::new(UnboundedReceiverStream::new(
1893 request_receiver,
1894 )))
1895 .await?;
1896
1897 Ok((request_sender, Box::pin(stream)))
1898 }
1899}
1900
1901#[async_trait]
1902impl TelemetryInfoFetcher for MetaClient {
1903 async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
1904 let resp = self
1905 .get_telemetry_info()
1906 .await
1907 .map_err(|e| e.to_report_string())?;
1908 let tracking_id = resp.get_tracking_id().ok();
1909 Ok(tracking_id.map(|id| id.to_owned()))
1910 }
1911}
1912
1913pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
1914
1915#[derive(Debug, Clone)]
1916struct GrpcMetaClientCore {
1917 cluster_client: ClusterServiceClient<Channel>,
1918 meta_member_client: MetaMemberServiceClient<Channel>,
1919 heartbeat_client: HeartbeatServiceClient<Channel>,
1920 ddl_client: DdlServiceClient<Channel>,
1921 hummock_client: HummockManagerServiceClient<Channel>,
1922 notification_client: NotificationServiceClient<Channel>,
1923 stream_client: StreamManagerServiceClient<Channel>,
1924 user_client: UserServiceClient<Channel>,
1925 scale_client: ScaleServiceClient<Channel>,
1926 backup_client: BackupServiceClient<Channel>,
1927 telemetry_client: TelemetryInfoServiceClient<Channel>,
1928 system_params_client: SystemParamsServiceClient<Channel>,
1929 session_params_client: SessionParamServiceClient<Channel>,
1930 serving_client: ServingServiceClient<Channel>,
1931 cloud_client: CloudServiceClient<Channel>,
1932 sink_coordinate_client: SinkCoordinationRpcClient,
1933 event_log_client: EventLogServiceClient<Channel>,
1934 cluster_limit_client: ClusterLimitServiceClient<Channel>,
1935 hosted_iceberg_catalog_service_client: HostedIcebergCatalogServiceClient<Channel>,
1936 monitor_client: MonitorServiceClient<Channel>,
1937}
1938
1939impl GrpcMetaClientCore {
1940 pub(crate) fn new(channel: Channel) -> Self {
1941 let cluster_client = ClusterServiceClient::new(channel.clone());
1942 let meta_member_client = MetaMemberClient::new(channel.clone());
1943 let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
1944 let ddl_client =
1945 DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1946 let hummock_client =
1947 HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1948 let notification_client =
1949 NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1950 let stream_client =
1951 StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1952 let user_client = UserServiceClient::new(channel.clone());
1953 let scale_client =
1954 ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1955 let backup_client = BackupServiceClient::new(channel.clone());
1956 let telemetry_client =
1957 TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1958 let system_params_client = SystemParamsServiceClient::new(channel.clone());
1959 let session_params_client = SessionParamServiceClient::new(channel.clone());
1960 let serving_client = ServingServiceClient::new(channel.clone());
1961 let cloud_client = CloudServiceClient::new(channel.clone());
1962 let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone());
1963 let event_log_client = EventLogServiceClient::new(channel.clone());
1964 let cluster_limit_client = ClusterLimitServiceClient::new(channel.clone());
1965 let hosted_iceberg_catalog_service_client =
1966 HostedIcebergCatalogServiceClient::new(channel.clone());
1967 let monitor_client = MonitorServiceClient::new(channel);
1968
1969 GrpcMetaClientCore {
1970 cluster_client,
1971 meta_member_client,
1972 heartbeat_client,
1973 ddl_client,
1974 hummock_client,
1975 notification_client,
1976 stream_client,
1977 user_client,
1978 scale_client,
1979 backup_client,
1980 telemetry_client,
1981 system_params_client,
1982 session_params_client,
1983 serving_client,
1984 cloud_client,
1985 sink_coordinate_client,
1986 event_log_client,
1987 cluster_limit_client,
1988 hosted_iceberg_catalog_service_client,
1989 monitor_client,
1990 }
1991 }
1992}
1993
1994#[derive(Debug, Clone)]
1998struct GrpcMetaClient {
1999 member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
2000 core: Arc<RwLock<GrpcMetaClientCore>>,
2001}
2002
2003type MetaMemberClient = MetaMemberServiceClient<Channel>;
2004
2005struct MetaMemberGroup {
2006 members: LruCache<http::Uri, Option<MetaMemberClient>>,
2007}
2008
2009struct MetaMemberManagement {
2010 core_ref: Arc<RwLock<GrpcMetaClientCore>>,
2011 members: Either<MetaMemberClient, MetaMemberGroup>,
2012 current_leader: http::Uri,
2013 meta_config: MetaConfig,
2014}
2015
2016impl MetaMemberManagement {
2017 const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
2018
2019 fn host_address_to_uri(addr: HostAddress) -> http::Uri {
2020 format!("http://{}:{}", addr.host, addr.port)
2021 .parse()
2022 .unwrap()
2023 }
2024
2025 async fn recreate_core(&self, channel: Channel) {
2026 let mut core = self.core_ref.write().await;
2027 *core = GrpcMetaClientCore::new(channel);
2028 }
2029
2030 async fn refresh_members(&mut self) -> Result<()> {
2031 let leader_addr = match self.members.as_mut() {
2032 Either::Left(client) => {
2033 let resp = client
2034 .to_owned()
2035 .members(MembersRequest {})
2036 .await
2037 .map_err(RpcError::from_meta_status)?;
2038 let resp = resp.into_inner();
2039 resp.members.into_iter().find(|member| member.is_leader)
2040 }
2041 Either::Right(member_group) => {
2042 let mut fetched_members = None;
2043
2044 for (addr, client) in &mut member_group.members {
2045 let members: Result<_> = try {
2046 let mut client = match client {
2047 Some(cached_client) => cached_client.to_owned(),
2048 None => {
2049 let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
2050 let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
2051 .await
2052 .context("failed to create client")?;
2053 let new_client: MetaMemberClient =
2054 MetaMemberServiceClient::new(channel);
2055 *client = Some(new_client.clone());
2056
2057 new_client
2058 }
2059 };
2060
2061 let resp = client
2062 .members(MembersRequest {})
2063 .await
2064 .context("failed to fetch members")?;
2065
2066 resp.into_inner().members
2067 };
2068
2069 let fetched = members.is_ok();
2070 fetched_members = Some(members);
2071 if fetched {
2072 break;
2073 }
2074 }
2075
2076 let members = fetched_members
2077 .context("no member available in the list")?
2078 .context("could not refresh members")?;
2079
2080 let mut leader = None;
2082 for member in members {
2083 if member.is_leader {
2084 leader = Some(member.clone());
2085 }
2086
2087 let addr = Self::host_address_to_uri(member.address.unwrap());
2088 if !member_group.members.contains(&addr) {
2090 tracing::info!("new meta member joined: {}", addr);
2091 member_group.members.put(addr, None);
2092 }
2093 }
2094
2095 leader
2096 }
2097 };
2098
2099 if let Some(leader) = leader_addr {
2100 let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
2101
2102 if discovered_leader != self.current_leader {
2103 tracing::info!("new meta leader {} discovered", discovered_leader);
2104
2105 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
2106 Duration::from_secs(self.meta_config.meta_leader_lease_secs),
2107 false,
2108 );
2109
2110 let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
2111 let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
2112 GrpcMetaClient::connect_to_endpoint(endpoint).await
2113 })
2114 .await?;
2115
2116 self.recreate_core(channel).await;
2117 self.current_leader = discovered_leader;
2118 }
2119 }
2120
2121 Ok(())
2122 }
2123}
2124
2125impl GrpcMetaClient {
2126 const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
2128 const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
2130 const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
2132 const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
2134
2135 fn start_meta_member_monitor(
2136 &self,
2137 init_leader_addr: http::Uri,
2138 members: Either<MetaMemberClient, MetaMemberGroup>,
2139 force_refresh_receiver: Receiver<Sender<Result<()>>>,
2140 meta_config: MetaConfig,
2141 ) -> Result<()> {
2142 let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
2143 let current_leader = init_leader_addr;
2144
2145 let enable_period_tick = matches!(members, Either::Right(_));
2146
2147 let member_management = MetaMemberManagement {
2148 core_ref,
2149 members,
2150 current_leader,
2151 meta_config,
2152 };
2153
2154 let mut force_refresh_receiver = force_refresh_receiver;
2155
2156 tokio::spawn(async move {
2157 let mut member_management = member_management;
2158 let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
2159
2160 loop {
2161 let event: Option<Sender<Result<()>>> = if enable_period_tick {
2162 tokio::select! {
2163 _ = ticker.tick() => None,
2164 result_sender = force_refresh_receiver.recv() => {
2165 if result_sender.is_none() {
2166 break;
2167 }
2168
2169 result_sender
2170 },
2171 }
2172 } else {
2173 let result_sender = force_refresh_receiver.recv().await;
2174
2175 if result_sender.is_none() {
2176 break;
2177 }
2178
2179 result_sender
2180 };
2181
2182 let tick_result = member_management.refresh_members().await;
2183 if let Err(e) = tick_result.as_ref() {
2184 tracing::warn!(error = %e.as_report(), "refresh meta member client failed");
2185 }
2186
2187 if let Some(sender) = event {
2188 let _resp = sender.send(tick_result);
2190 }
2191 }
2192 });
2193
2194 Ok(())
2195 }
2196
2197 async fn force_refresh_leader(&self) -> Result<()> {
2198 let (sender, receiver) = oneshot::channel();
2199
2200 self.member_monitor_event_sender
2201 .send(sender)
2202 .await
2203 .map_err(|e| anyhow!(e))?;
2204
2205 receiver.await.map_err(|e| anyhow!(e))?
2206 }
2207
2208 pub async fn new(strategy: &MetaAddressStrategy, config: MetaConfig) -> Result<Self> {
2210 let (channel, addr) = match strategy {
2211 MetaAddressStrategy::LoadBalance(addr) => {
2212 Self::try_build_rpc_channel(vec![addr.clone()]).await
2213 }
2214 MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
2215 }?;
2216 let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
2217 let client = GrpcMetaClient {
2218 member_monitor_event_sender: force_refresh_sender,
2219 core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
2220 };
2221
2222 let meta_member_client = client.core.read().await.meta_member_client.clone();
2223 let members = match strategy {
2224 MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
2225 MetaAddressStrategy::List(addrs) => {
2226 let mut members = LruCache::new(20);
2227 for addr in addrs {
2228 members.put(addr.clone(), None);
2229 }
2230 members.put(addr.clone(), Some(meta_member_client));
2231
2232 Either::Right(MetaMemberGroup { members })
2233 }
2234 };
2235
2236 client.start_meta_member_monitor(addr, members, force_refresh_receiver, config)?;
2237
2238 client.force_refresh_leader().await?;
2239
2240 Ok(client)
2241 }
2242
2243 fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2244 Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2245 }
2246
2247 pub(crate) async fn try_build_rpc_channel(
2248 addrs: impl IntoIterator<Item = http::Uri>,
2249 ) -> Result<(Channel, http::Uri)> {
2250 let endpoints: Vec<_> = addrs
2251 .into_iter()
2252 .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2253 .collect();
2254
2255 let mut last_error = None;
2256
2257 for (endpoint, addr) in endpoints {
2258 match Self::connect_to_endpoint(endpoint).await {
2259 Ok(channel) => {
2260 tracing::info!("Connect to meta server {} successfully", addr);
2261 return Ok((channel, addr));
2262 }
2263 Err(e) => {
2264 tracing::warn!(
2265 error = %e.as_report(),
2266 "Failed to connect to meta server {}, trying again",
2267 addr,
2268 );
2269 last_error = Some(e);
2270 }
2271 }
2272 }
2273
2274 if let Some(last_error) = last_error {
2275 Err(anyhow::anyhow!(last_error)
2276 .context("failed to connect to all meta servers")
2277 .into())
2278 } else {
2279 bail!("no meta server address provided")
2280 }
2281 }
2282
2283 async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2284 let channel = endpoint
2285 .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2286 .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2287 .connect_timeout(Duration::from_secs(5))
2288 .monitored_connect("grpc-meta-client", Default::default())
2289 .await?
2290 .wrapped();
2291
2292 Ok(channel)
2293 }
2294
2295 pub(crate) fn retry_strategy_to_bound(
2296 high_bound: Duration,
2297 exceed: bool,
2298 ) -> impl Iterator<Item = Duration> {
2299 let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2300 .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2301 .map(jitter);
2302
2303 let mut sum = Duration::default();
2304
2305 iter.take_while(move |duration| {
2306 sum += *duration;
2307
2308 if exceed {
2309 sum < high_bound + *duration
2310 } else {
2311 sum < high_bound
2312 }
2313 })
2314 }
2315}
2316
2317macro_rules! for_all_meta_rpc {
2318 ($macro:ident) => {
2319 $macro! {
2320 { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2321 ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2322 ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2323 ,{ cluster_client, update_worker_node_schedulability, UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse }
2324 ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2325 ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2326 ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2327 ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2328 ,{ stream_client, flush, FlushRequest, FlushResponse }
2329 ,{ stream_client, pause, PauseRequest, PauseResponse }
2330 ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2331 ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2332 ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2333 ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2334 ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2335 ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2336 ,{ stream_client, list_creating_fragment_distribution, ListCreatingFragmentDistributionRequest, ListCreatingFragmentDistributionResponse }
2337 ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2338 ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2339 ,{ stream_client, list_object_dependencies, ListObjectDependenciesRequest, ListObjectDependenciesResponse }
2340 ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2341 ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2342 ,{ stream_client, alter_connector_props, AlterConnectorPropsRequest, AlterConnectorPropsResponse }
2343 ,{ stream_client, get_fragment_by_id, GetFragmentByIdRequest, GetFragmentByIdResponse }
2344 ,{ stream_client, set_sync_log_store_aligned, SetSyncLogStoreAlignedRequest, SetSyncLogStoreAlignedResponse }
2345 ,{ stream_client, refresh, RefreshRequest, RefreshResponse }
2346 ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2347 ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2348 ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2349 ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2350 ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2351 ,{ ddl_client, alter_cdc_table_backfill_parallelism, AlterCdcTableBackfillParallelismRequest, AlterCdcTableBackfillParallelismResponse }
2352 ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2353 ,{ ddl_client, alter_database_param, AlterDatabaseParamRequest, AlterDatabaseParamResponse }
2354 ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2355 ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2356 ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2357 ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2358 ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2359 ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2360 ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2361 ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2362 ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2363 ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2364 ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2365 ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2366 ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2367 ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2368 ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2369 ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2370 ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2371 ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2372 ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2373 ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2374 ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2375 ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2376 ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2377 ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2378 ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2379 ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2380 ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2381 ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2382 ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2383 ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2384 ,{ ddl_client, wait, WaitRequest, WaitResponse }
2385 ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2386 ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2387 ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2388 ,{ ddl_client, compact_iceberg_table, CompactIcebergTableRequest, CompactIcebergTableResponse }
2389 ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2390 ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2391 ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2392 ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2393 ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2394 ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2395 ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2396 ,{ hummock_client, get_new_object_ids, GetNewObjectIdsRequest, GetNewObjectIdsResponse }
2397 ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2398 ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2399 ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2400 ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2401 ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2402 ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2403 ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2404 ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2405 ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2406 ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2407 ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2408 ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2409 ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2410 ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2411 ,{ hummock_client, subscribe_iceberg_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeIcebergCompactionEventRequest>, Streaming<SubscribeIcebergCompactionEventResponse> }
2412 ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2413 ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2414 ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2415 ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2416 ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2417 ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2418 ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2419 ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2420 ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2421 ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2422 ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2423 ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2424 ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2425 ,{ user_client, alter_default_privilege, AlterDefaultPrivilegeRequest, AlterDefaultPrivilegeResponse }
2426 ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2427 ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2428 ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2429 ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2430 ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2431 ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2432 ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2433 ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2434 ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2435 ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2436 ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2437 ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2438 ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2439 ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2440 ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2441 ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2442 ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2443 ,{ hosted_iceberg_catalog_service_client, list_iceberg_tables, ListIcebergTablesRequest, ListIcebergTablesResponse }
2444 ,{ monitor_client, stack_trace, StackTraceRequest, StackTraceResponse }
2445 }
2446 };
2447}
2448
2449impl GrpcMetaClient {
2450 async fn refresh_client_if_needed(&self, code: Code) {
2451 if matches!(
2452 code,
2453 Code::Unknown | Code::Unimplemented | Code::Unavailable
2454 ) {
2455 tracing::debug!("matching tonic code {}", code);
2456 let (result_sender, result_receiver) = oneshot::channel();
2457 if self
2458 .member_monitor_event_sender
2459 .try_send(result_sender)
2460 .is_ok()
2461 {
2462 if let Ok(Err(e)) = result_receiver.await {
2463 tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2464 }
2465 } else {
2466 tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2467 }
2468 }
2469 }
2470}
2471
2472impl GrpcMetaClient {
2473 for_all_meta_rpc! { meta_rpc_client_method_impl }
2474}