1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{
33 AlterDatabaseParam, FunctionId, IndexId, ObjectId, SecretId, TableId,
34};
35use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
36use risingwave_common::hash::WorkerSlotMapping;
37use risingwave_common::monitor::EndpointExt;
38use risingwave_common::system_param::reader::SystemParamsReader;
39use risingwave_common::telemetry::report::TelemetryInfoFetcher;
40use risingwave_common::util::addr::HostAddr;
41use risingwave_common::util::meta_addr::MetaAddressStrategy;
42use risingwave_common::util::resource_util::cpu::total_cpu_available;
43use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
44use risingwave_error::bail;
45use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
46use risingwave_hummock_sdk::compaction_group::StateTableId;
47use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
48use risingwave_hummock_sdk::{
49 CompactionGroupId, HummockEpoch, HummockVersionId, ObjectIdRange, SyncResult,
50};
51use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
52use risingwave_pb::backup_service::*;
53use risingwave_pb::catalog::{
54 Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
55 PbSubscription, PbTable, PbView, Table,
56};
57use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
58use risingwave_pb::cloud_service::*;
59use risingwave_pb::common::worker_node::Property;
60use risingwave_pb::common::{HostAddress, OptionalUint32, OptionalUint64, WorkerNode, WorkerType};
61use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
62use risingwave_pb::ddl_service::alter_owner_request::Object;
63use risingwave_pb::ddl_service::create_materialized_view_request::PbBackfillType;
64use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
65use risingwave_pb::ddl_service::drop_table_request::SourceId;
66use risingwave_pb::ddl_service::*;
67use risingwave_pb::hummock::compact_task::TaskStatus;
68use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
69use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
70use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
71use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
72use risingwave_pb::hummock::write_limits::WriteLimit;
73use risingwave_pb::hummock::*;
74use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Register as IcebergRegister;
75use risingwave_pb::iceberg_compaction::{
76 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
77 subscribe_iceberg_compaction_event_request,
78};
79use risingwave_pb::meta::alter_connector_props_request::AlterConnectorPropsObject;
80use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
81use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
82use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
83use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
84use risingwave_pb::meta::hosted_iceberg_catalog_service_client::HostedIcebergCatalogServiceClient;
85use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
86use risingwave_pb::meta::list_actor_states_response::ActorState;
87use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
88use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
89use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
90use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
91use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
92use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
93use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
94use risingwave_pb::meta::serving_service_client::ServingServiceClient;
95use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
96use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
97use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
98use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
99use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
100use risingwave_pb::meta::{FragmentDistribution, *};
101use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
102use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
103use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
104use risingwave_pb::secret::PbSecretRef;
105use risingwave_pb::stream_plan::StreamFragmentGraph;
106use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
107use risingwave_pb::user::update_user_request::UpdateField;
108use risingwave_pb::user::user_service_client::UserServiceClient;
109use risingwave_pb::user::*;
110use thiserror_ext::AsReport;
111use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
112use tokio::sync::oneshot::Sender;
113use tokio::sync::{RwLock, mpsc, oneshot};
114use tokio::task::JoinHandle;
115use tokio::time::{self};
116use tokio_retry::strategy::{ExponentialBackoff, jitter};
117use tokio_stream::wrappers::UnboundedReceiverStream;
118use tonic::transport::Endpoint;
119use tonic::{Code, Request, Streaming};
120
121use crate::channel::{Channel, WrappedChannelExt};
122use crate::error::{Result, RpcError};
123use crate::hummock_meta_client::{
124 CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
125 IcebergCompactionEventItem,
126};
127use crate::meta_rpc_client_method_impl;
128
129type ConnectionId = u32;
130type DatabaseId = u32;
131type SchemaId = u32;
132
133#[derive(Clone, Debug)]
135pub struct MetaClient {
136 worker_id: u32,
137 worker_type: WorkerType,
138 host_addr: HostAddr,
139 inner: GrpcMetaClient,
140 meta_config: MetaConfig,
141 cluster_id: String,
142 shutting_down: Arc<AtomicBool>,
143}
144
145impl MetaClient {
146 pub fn worker_id(&self) -> u32 {
147 self.worker_id
148 }
149
150 pub fn host_addr(&self) -> &HostAddr {
151 &self.host_addr
152 }
153
154 pub fn worker_type(&self) -> WorkerType {
155 self.worker_type
156 }
157
158 pub fn cluster_id(&self) -> &str {
159 &self.cluster_id
160 }
161
162 pub async fn subscribe(
164 &self,
165 subscribe_type: SubscribeType,
166 ) -> Result<Streaming<SubscribeResponse>> {
167 let request = SubscribeRequest {
168 subscribe_type: subscribe_type as i32,
169 host: Some(self.host_addr.to_protobuf()),
170 worker_id: self.worker_id(),
171 };
172
173 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
174 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
175 true,
176 );
177
178 tokio_retry::Retry::spawn(retry_strategy, || async {
179 let request = request.clone();
180 self.inner.subscribe(request).await
181 })
182 .await
183 }
184
185 pub async fn create_connection(
186 &self,
187 connection_name: String,
188 database_id: u32,
189 schema_id: u32,
190 owner_id: u32,
191 req: create_connection_request::Payload,
192 ) -> Result<WaitVersion> {
193 let request = CreateConnectionRequest {
194 name: connection_name,
195 database_id,
196 schema_id,
197 owner_id,
198 payload: Some(req),
199 };
200 let resp = self.inner.create_connection(request).await?;
201 Ok(resp
202 .version
203 .ok_or_else(|| anyhow!("wait version not set"))?)
204 }
205
206 pub async fn create_secret(
207 &self,
208 secret_name: String,
209 database_id: u32,
210 schema_id: u32,
211 owner_id: u32,
212 value: Vec<u8>,
213 ) -> Result<WaitVersion> {
214 let request = CreateSecretRequest {
215 name: secret_name,
216 database_id,
217 schema_id,
218 owner_id,
219 value,
220 };
221 let resp = self.inner.create_secret(request).await?;
222 Ok(resp
223 .version
224 .ok_or_else(|| anyhow!("wait version not set"))?)
225 }
226
227 pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
228 let request = ListConnectionsRequest {};
229 let resp = self.inner.list_connections(request).await?;
230 Ok(resp.connections)
231 }
232
233 pub async fn drop_connection(&self, connection_id: ConnectionId) -> Result<WaitVersion> {
234 let request = DropConnectionRequest { connection_id };
235 let resp = self.inner.drop_connection(request).await?;
236 Ok(resp
237 .version
238 .ok_or_else(|| anyhow!("wait version not set"))?)
239 }
240
241 pub async fn drop_secret(&self, secret_id: SecretId) -> Result<WaitVersion> {
242 let request = DropSecretRequest {
243 secret_id: secret_id.into(),
244 };
245 let resp = self.inner.drop_secret(request).await?;
246 Ok(resp
247 .version
248 .ok_or_else(|| anyhow!("wait version not set"))?)
249 }
250
251 pub async fn register_new(
255 addr_strategy: MetaAddressStrategy,
256 worker_type: WorkerType,
257 addr: &HostAddr,
258 property: Property,
259 meta_config: &MetaConfig,
260 ) -> (Self, SystemParamsReader) {
261 let ret =
262 Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
263
264 match ret {
265 Ok(ret) => ret,
266 Err(err) => {
267 tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
268 std::process::exit(1);
269 }
270 }
271 }
272
273 async fn register_new_inner(
274 addr_strategy: MetaAddressStrategy,
275 worker_type: WorkerType,
276 addr: &HostAddr,
277 property: Property,
278 meta_config: &MetaConfig,
279 ) -> Result<(Self, SystemParamsReader)> {
280 tracing::info!("register meta client using strategy: {}", addr_strategy);
281
282 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
284 Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
285 true,
286 );
287
288 if property.is_unschedulable {
289 tracing::warn!("worker {:?} registered as unschedulable", addr.clone());
290 }
291 let init_result: Result<_> = tokio_retry::RetryIf::spawn(
292 retry_strategy,
293 || async {
294 let grpc_meta_client =
295 GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
296
297 let add_worker_resp = grpc_meta_client
298 .add_worker_node(AddWorkerNodeRequest {
299 worker_type: worker_type as i32,
300 host: Some(addr.to_protobuf()),
301 property: Some(property.clone()),
302 resource: Some(risingwave_pb::common::worker_node::Resource {
303 rw_version: RW_VERSION.to_owned(),
304 total_memory_bytes: system_memory_available_bytes() as _,
305 total_cpu_cores: total_cpu_available() as _,
306 }),
307 })
308 .await
309 .context("failed to add worker node")?;
310
311 let system_params_resp = grpc_meta_client
312 .get_system_params(GetSystemParamsRequest {})
313 .await
314 .context("failed to get initial system params")?;
315
316 Ok((add_worker_resp, system_params_resp, grpc_meta_client))
317 },
318 |e: &RpcError| !e.is_from_tonic_server_impl(),
321 )
322 .await;
323
324 let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
325 let worker_id = add_worker_resp
326 .node_id
327 .expect("AddWorkerNodeResponse::node_id is empty");
328
329 let meta_client = Self {
330 worker_id,
331 worker_type,
332 host_addr: addr.clone(),
333 inner: grpc_meta_client,
334 meta_config: meta_config.to_owned(),
335 cluster_id: add_worker_resp.cluster_id,
336 shutting_down: Arc::new(false.into()),
337 };
338
339 static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
340 REPORT_PANIC.call_once(|| {
341 let meta_client_clone = meta_client.clone();
342 std::panic::update_hook(move |default_hook, info| {
343 meta_client_clone.try_add_panic_event_blocking(info, None);
345 default_hook(info);
346 });
347 });
348
349 Ok((meta_client, system_params_resp.params.unwrap().into()))
350 }
351
352 pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
354 let request = ActivateWorkerNodeRequest {
355 host: Some(addr.to_protobuf()),
356 node_id: self.worker_id,
357 };
358 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
359 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
360 true,
361 );
362 tokio_retry::Retry::spawn(retry_strategy, || async {
363 let request = request.clone();
364 self.inner.activate_worker_node(request).await
365 })
366 .await?;
367
368 Ok(())
369 }
370
371 pub async fn send_heartbeat(&self, node_id: u32) -> Result<()> {
373 let request = HeartbeatRequest { node_id };
374 let resp = self.inner.heartbeat(request).await?;
375 if let Some(status) = resp.status
376 && status.code() == risingwave_pb::common::status::Code::UnknownWorker
377 {
378 if !self.shutting_down.load(Relaxed) {
381 tracing::error!(message = status.message, "worker expired");
382 std::process::exit(1);
383 }
384 }
385 Ok(())
386 }
387
388 pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
389 let request = CreateDatabaseRequest { db: Some(db) };
390 let resp = self.inner.create_database(request).await?;
391 Ok(resp
393 .version
394 .ok_or_else(|| anyhow!("wait version not set"))?)
395 }
396
397 pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
398 let request = CreateSchemaRequest {
399 schema: Some(schema),
400 };
401 let resp = self.inner.create_schema(request).await?;
402 Ok(resp
404 .version
405 .ok_or_else(|| anyhow!("wait version not set"))?)
406 }
407
408 pub async fn create_materialized_view(
409 &self,
410 table: PbTable,
411 graph: StreamFragmentGraph,
412 dependencies: HashSet<ObjectId>,
413 specific_resource_group: Option<String>,
414 if_not_exists: bool,
415 ) -> Result<WaitVersion> {
416 let request = CreateMaterializedViewRequest {
417 materialized_view: Some(table),
418 fragment_graph: Some(graph),
419 backfill: PbBackfillType::Regular as _,
420 dependencies: dependencies.into_iter().collect(),
421 specific_resource_group,
422 if_not_exists,
423 };
424 let resp = self.inner.create_materialized_view(request).await?;
425 Ok(resp
427 .version
428 .ok_or_else(|| anyhow!("wait version not set"))?)
429 }
430
431 pub async fn drop_materialized_view(
432 &self,
433 table_id: TableId,
434 cascade: bool,
435 ) -> Result<WaitVersion> {
436 let request = DropMaterializedViewRequest {
437 table_id: table_id.table_id(),
438 cascade,
439 };
440
441 let resp = self.inner.drop_materialized_view(request).await?;
442 Ok(resp
443 .version
444 .ok_or_else(|| anyhow!("wait version not set"))?)
445 }
446
447 pub async fn create_source(
448 &self,
449 source: PbSource,
450 graph: Option<StreamFragmentGraph>,
451 if_not_exists: bool,
452 ) -> Result<WaitVersion> {
453 let request = CreateSourceRequest {
454 source: Some(source),
455 fragment_graph: graph,
456 if_not_exists,
457 };
458
459 let resp = self.inner.create_source(request).await?;
460 Ok(resp
461 .version
462 .ok_or_else(|| anyhow!("wait version not set"))?)
463 }
464
465 pub async fn create_sink(
466 &self,
467 sink: PbSink,
468 graph: StreamFragmentGraph,
469 affected_table_change: Option<ReplaceJobPlan>,
470 dependencies: HashSet<ObjectId>,
471 if_not_exists: bool,
472 ) -> Result<WaitVersion> {
473 let request = CreateSinkRequest {
474 sink: Some(sink),
475 fragment_graph: Some(graph),
476 affected_table_change,
477 dependencies: dependencies.into_iter().collect(),
478 if_not_exists,
479 };
480
481 let resp = self.inner.create_sink(request).await?;
482 Ok(resp
483 .version
484 .ok_or_else(|| anyhow!("wait version not set"))?)
485 }
486
487 pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
488 let request = CreateSubscriptionRequest {
489 subscription: Some(subscription),
490 };
491
492 let resp = self.inner.create_subscription(request).await?;
493 Ok(resp
494 .version
495 .ok_or_else(|| anyhow!("wait version not set"))?)
496 }
497
498 pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
499 let request = CreateFunctionRequest {
500 function: Some(function),
501 };
502 let resp = self.inner.create_function(request).await?;
503 Ok(resp
504 .version
505 .ok_or_else(|| anyhow!("wait version not set"))?)
506 }
507
508 pub async fn create_table(
509 &self,
510 source: Option<PbSource>,
511 table: PbTable,
512 graph: StreamFragmentGraph,
513 job_type: PbTableJobType,
514 if_not_exists: bool,
515 dependencies: HashSet<ObjectId>,
516 ) -> Result<WaitVersion> {
517 let request = CreateTableRequest {
518 materialized_view: Some(table),
519 fragment_graph: Some(graph),
520 source,
521 job_type: job_type as _,
522 if_not_exists,
523 dependencies: dependencies.into_iter().collect(),
524 };
525 let resp = self.inner.create_table(request).await?;
526 Ok(resp
528 .version
529 .ok_or_else(|| anyhow!("wait version not set"))?)
530 }
531
532 pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
533 let request = CommentOnRequest {
534 comment: Some(comment),
535 };
536 let resp = self.inner.comment_on(request).await?;
537 Ok(resp
538 .version
539 .ok_or_else(|| anyhow!("wait version not set"))?)
540 }
541
542 pub async fn alter_name(
543 &self,
544 object: alter_name_request::Object,
545 name: &str,
546 ) -> Result<WaitVersion> {
547 let request = AlterNameRequest {
548 object: Some(object),
549 new_name: name.to_owned(),
550 };
551 let resp = self.inner.alter_name(request).await?;
552 Ok(resp
553 .version
554 .ok_or_else(|| anyhow!("wait version not set"))?)
555 }
556
557 pub async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<WaitVersion> {
558 let request = AlterOwnerRequest {
559 object: Some(object),
560 owner_id,
561 };
562 let resp = self.inner.alter_owner(request).await?;
563 Ok(resp
564 .version
565 .ok_or_else(|| anyhow!("wait version not set"))?)
566 }
567
568 pub async fn alter_database_param(
569 &self,
570 database_id: DatabaseId,
571 param: AlterDatabaseParam,
572 ) -> Result<WaitVersion> {
573 let request = match param {
574 AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms) => {
575 let barrier_interval_ms = OptionalUint32 {
576 value: barrier_interval_ms,
577 };
578 AlterDatabaseParamRequest {
579 database_id,
580 param: Some(alter_database_param_request::Param::BarrierIntervalMs(
581 barrier_interval_ms,
582 )),
583 }
584 }
585 AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency) => {
586 let checkpoint_frequency = OptionalUint64 {
587 value: checkpoint_frequency,
588 };
589 AlterDatabaseParamRequest {
590 database_id,
591 param: Some(alter_database_param_request::Param::CheckpointFrequency(
592 checkpoint_frequency,
593 )),
594 }
595 }
596 };
597 let resp = self.inner.alter_database_param(request).await?;
598 Ok(resp
599 .version
600 .ok_or_else(|| anyhow!("wait version not set"))?)
601 }
602
603 pub async fn alter_set_schema(
604 &self,
605 object: alter_set_schema_request::Object,
606 new_schema_id: u32,
607 ) -> Result<WaitVersion> {
608 let request = AlterSetSchemaRequest {
609 new_schema_id,
610 object: Some(object),
611 };
612 let resp = self.inner.alter_set_schema(request).await?;
613 Ok(resp
614 .version
615 .ok_or_else(|| anyhow!("wait version not set"))?)
616 }
617
618 pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
619 let request = AlterSourceRequest {
620 source: Some(source),
621 };
622 let resp = self.inner.alter_source(request).await?;
623 Ok(resp
624 .version
625 .ok_or_else(|| anyhow!("wait version not set"))?)
626 }
627
628 pub async fn alter_parallelism(
629 &self,
630 table_id: u32,
631 parallelism: PbTableParallelism,
632 deferred: bool,
633 ) -> Result<()> {
634 let request = AlterParallelismRequest {
635 table_id,
636 parallelism: Some(parallelism),
637 deferred,
638 };
639
640 self.inner.alter_parallelism(request).await?;
641 Ok(())
642 }
643
644 pub async fn alter_resource_group(
645 &self,
646 table_id: u32,
647 resource_group: Option<String>,
648 deferred: bool,
649 ) -> Result<()> {
650 let request = AlterResourceGroupRequest {
651 table_id,
652 resource_group,
653 deferred,
654 };
655
656 self.inner.alter_resource_group(request).await?;
657 Ok(())
658 }
659
660 pub async fn alter_swap_rename(
661 &self,
662 object: alter_swap_rename_request::Object,
663 ) -> Result<WaitVersion> {
664 let request = AlterSwapRenameRequest {
665 object: Some(object),
666 };
667 let resp = self.inner.alter_swap_rename(request).await?;
668 Ok(resp
669 .version
670 .ok_or_else(|| anyhow!("wait version not set"))?)
671 }
672
673 pub async fn alter_secret(
674 &self,
675 secret_id: u32,
676 secret_name: String,
677 database_id: u32,
678 schema_id: u32,
679 owner_id: u32,
680 value: Vec<u8>,
681 ) -> Result<WaitVersion> {
682 let request = AlterSecretRequest {
683 secret_id,
684 name: secret_name,
685 database_id,
686 schema_id,
687 owner_id,
688 value,
689 };
690 let resp = self.inner.alter_secret(request).await?;
691 Ok(resp
692 .version
693 .ok_or_else(|| anyhow!("wait version not set"))?)
694 }
695
696 pub async fn replace_job(
697 &self,
698 graph: StreamFragmentGraph,
699 replace_job: ReplaceJob,
700 ) -> Result<WaitVersion> {
701 let request = ReplaceJobPlanRequest {
702 plan: Some(ReplaceJobPlan {
703 fragment_graph: Some(graph),
704 replace_job: Some(replace_job),
705 }),
706 };
707 let resp = self.inner.replace_job_plan(request).await?;
708 Ok(resp
710 .version
711 .ok_or_else(|| anyhow!("wait version not set"))?)
712 }
713
714 pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
715 let request = AutoSchemaChangeRequest {
716 schema_change: Some(schema_change),
717 };
718 let _ = self.inner.auto_schema_change(request).await?;
719 Ok(())
720 }
721
722 pub async fn create_view(
723 &self,
724 view: PbView,
725 dependencies: HashSet<ObjectId>,
726 ) -> Result<WaitVersion> {
727 let request = CreateViewRequest {
728 view: Some(view),
729 dependencies: dependencies.into_iter().collect(),
730 };
731 let resp = self.inner.create_view(request).await?;
732 Ok(resp
734 .version
735 .ok_or_else(|| anyhow!("wait version not set"))?)
736 }
737
738 pub async fn create_index(
739 &self,
740 index: PbIndex,
741 table: PbTable,
742 graph: StreamFragmentGraph,
743 if_not_exists: bool,
744 ) -> Result<WaitVersion> {
745 let request = CreateIndexRequest {
746 index: Some(index),
747 index_table: Some(table),
748 fragment_graph: Some(graph),
749 if_not_exists,
750 };
751 let resp = self.inner.create_index(request).await?;
752 Ok(resp
754 .version
755 .ok_or_else(|| anyhow!("wait version not set"))?)
756 }
757
758 pub async fn drop_table(
759 &self,
760 source_id: Option<u32>,
761 table_id: TableId,
762 cascade: bool,
763 ) -> Result<WaitVersion> {
764 let request = DropTableRequest {
765 source_id: source_id.map(SourceId::Id),
766 table_id: table_id.table_id(),
767 cascade,
768 };
769
770 let resp = self.inner.drop_table(request).await?;
771 Ok(resp
772 .version
773 .ok_or_else(|| anyhow!("wait version not set"))?)
774 }
775
776 pub async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<WaitVersion> {
777 let request = DropViewRequest { view_id, cascade };
778 let resp = self.inner.drop_view(request).await?;
779 Ok(resp
780 .version
781 .ok_or_else(|| anyhow!("wait version not set"))?)
782 }
783
784 pub async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<WaitVersion> {
785 let request = DropSourceRequest { source_id, cascade };
786 let resp = self.inner.drop_source(request).await?;
787 Ok(resp
788 .version
789 .ok_or_else(|| anyhow!("wait version not set"))?)
790 }
791
792 pub async fn drop_sink(
793 &self,
794 sink_id: u32,
795 cascade: bool,
796 affected_table_change: Option<ReplaceJobPlan>,
797 ) -> Result<WaitVersion> {
798 let request = DropSinkRequest {
799 sink_id,
800 cascade,
801 affected_table_change,
802 };
803 let resp = self.inner.drop_sink(request).await?;
804 Ok(resp
805 .version
806 .ok_or_else(|| anyhow!("wait version not set"))?)
807 }
808
809 pub async fn drop_subscription(
810 &self,
811 subscription_id: u32,
812 cascade: bool,
813 ) -> Result<WaitVersion> {
814 let request = DropSubscriptionRequest {
815 subscription_id,
816 cascade,
817 };
818 let resp = self.inner.drop_subscription(request).await?;
819 Ok(resp
820 .version
821 .ok_or_else(|| anyhow!("wait version not set"))?)
822 }
823
824 pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
825 let request = DropIndexRequest {
826 index_id: index_id.index_id,
827 cascade,
828 };
829 let resp = self.inner.drop_index(request).await?;
830 Ok(resp
831 .version
832 .ok_or_else(|| anyhow!("wait version not set"))?)
833 }
834
835 pub async fn drop_function(&self, function_id: FunctionId) -> Result<WaitVersion> {
836 let request = DropFunctionRequest {
837 function_id: function_id.0,
838 };
839 let resp = self.inner.drop_function(request).await?;
840 Ok(resp
841 .version
842 .ok_or_else(|| anyhow!("wait version not set"))?)
843 }
844
845 pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
846 let request = DropDatabaseRequest { database_id };
847 let resp = self.inner.drop_database(request).await?;
848 Ok(resp
849 .version
850 .ok_or_else(|| anyhow!("wait version not set"))?)
851 }
852
853 pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
854 let request = DropSchemaRequest { schema_id, cascade };
855 let resp = self.inner.drop_schema(request).await?;
856 Ok(resp
857 .version
858 .ok_or_else(|| anyhow!("wait version not set"))?)
859 }
860
861 pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
863 let request = CreateUserRequest { user: Some(user) };
864 let resp = self.inner.create_user(request).await?;
865 Ok(resp.version)
866 }
867
868 pub async fn drop_user(&self, user_id: u32) -> Result<u64> {
869 let request = DropUserRequest { user_id };
870 let resp = self.inner.drop_user(request).await?;
871 Ok(resp.version)
872 }
873
874 pub async fn update_user(
875 &self,
876 user: UserInfo,
877 update_fields: Vec<UpdateField>,
878 ) -> Result<u64> {
879 let request = UpdateUserRequest {
880 user: Some(user),
881 update_fields: update_fields
882 .into_iter()
883 .map(|field| field as i32)
884 .collect::<Vec<_>>(),
885 };
886 let resp = self.inner.update_user(request).await?;
887 Ok(resp.version)
888 }
889
890 pub async fn grant_privilege(
891 &self,
892 user_ids: Vec<u32>,
893 privileges: Vec<GrantPrivilege>,
894 with_grant_option: bool,
895 granted_by: u32,
896 ) -> Result<u64> {
897 let request = GrantPrivilegeRequest {
898 user_ids,
899 privileges,
900 with_grant_option,
901 granted_by,
902 };
903 let resp = self.inner.grant_privilege(request).await?;
904 Ok(resp.version)
905 }
906
907 pub async fn revoke_privilege(
908 &self,
909 user_ids: Vec<u32>,
910 privileges: Vec<GrantPrivilege>,
911 granted_by: u32,
912 revoke_by: u32,
913 revoke_grant_option: bool,
914 cascade: bool,
915 ) -> Result<u64> {
916 let request = RevokePrivilegeRequest {
917 user_ids,
918 privileges,
919 granted_by,
920 revoke_by,
921 revoke_grant_option,
922 cascade,
923 };
924 let resp = self.inner.revoke_privilege(request).await?;
925 Ok(resp.version)
926 }
927
928 pub async fn alter_default_privilege(
929 &self,
930 user_ids: Vec<u32>,
931 database_id: DatabaseId,
932 schema_ids: Vec<SchemaId>,
933 operation: AlterDefaultPrivilegeOperation,
934 granted_by: u32,
935 ) -> Result<()> {
936 let request = AlterDefaultPrivilegeRequest {
937 user_ids,
938 database_id,
939 schema_ids,
940 operation: Some(operation),
941 granted_by,
942 };
943 self.inner.alter_default_privilege(request).await?;
944 Ok(())
945 }
946
947 pub async fn unregister(&self) -> Result<()> {
949 let request = DeleteWorkerNodeRequest {
950 host: Some(self.host_addr.to_protobuf()),
951 };
952 self.inner.delete_worker_node(request).await?;
953 self.shutting_down.store(true, Relaxed);
954 Ok(())
955 }
956
957 pub async fn try_unregister(&self) {
959 match self.unregister().await {
960 Ok(_) => {
961 tracing::info!(
962 worker_id = self.worker_id(),
963 "successfully unregistered from meta service",
964 )
965 }
966 Err(e) => {
967 tracing::warn!(
968 error = %e.as_report(),
969 worker_id = self.worker_id(),
970 "failed to unregister from meta service",
971 );
972 }
973 }
974 }
975
976 pub async fn update_schedulability(
977 &self,
978 worker_ids: &[u32],
979 schedulability: Schedulability,
980 ) -> Result<UpdateWorkerNodeSchedulabilityResponse> {
981 let request = UpdateWorkerNodeSchedulabilityRequest {
982 worker_ids: worker_ids.to_vec(),
983 schedulability: schedulability.into(),
984 };
985 let resp = self
986 .inner
987 .update_worker_node_schedulability(request)
988 .await?;
989 Ok(resp)
990 }
991
992 pub async fn list_worker_nodes(
993 &self,
994 worker_type: Option<WorkerType>,
995 ) -> Result<Vec<WorkerNode>> {
996 let request = ListAllNodesRequest {
997 worker_type: worker_type.map(Into::into),
998 include_starting_nodes: true,
999 };
1000 let resp = self.inner.list_all_nodes(request).await?;
1001 Ok(resp.nodes)
1002 }
1003
1004 pub fn start_heartbeat_loop(
1006 meta_client: MetaClient,
1007 min_interval: Duration,
1008 ) -> (JoinHandle<()>, Sender<()>) {
1009 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1010 let join_handle = tokio::spawn(async move {
1011 let mut min_interval_ticker = tokio::time::interval(min_interval);
1012 loop {
1013 tokio::select! {
1014 biased;
1015 _ = &mut shutdown_rx => {
1017 tracing::info!("Heartbeat loop is stopped");
1018 return;
1019 }
1020 _ = min_interval_ticker.tick() => {},
1022 }
1023 tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
1024 match tokio::time::timeout(
1025 min_interval * 3,
1027 meta_client.send_heartbeat(meta_client.worker_id()),
1028 )
1029 .await
1030 {
1031 Ok(Ok(_)) => {}
1032 Ok(Err(err)) => {
1033 tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
1034 }
1035 Err(_) => {
1036 tracing::warn!("Failed to send_heartbeat: timeout");
1037 }
1038 }
1039 }
1040 });
1041 (join_handle, shutdown_tx)
1042 }
1043
1044 pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
1045 let request = RisectlListStateTablesRequest {};
1046 let resp = self.inner.risectl_list_state_tables(request).await?;
1047 Ok(resp.tables)
1048 }
1049
1050 pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
1051 let request = FlushRequest { database_id };
1052 let resp = self.inner.flush(request).await?;
1053 Ok(HummockVersionId::new(resp.hummock_version_id))
1054 }
1055
1056 pub async fn wait(&self) -> Result<()> {
1057 let request = WaitRequest {};
1058 self.inner.wait(request).await?;
1059 Ok(())
1060 }
1061
1062 pub async fn recover(&self) -> Result<()> {
1063 let request = RecoverRequest {};
1064 self.inner.recover(request).await?;
1065 Ok(())
1066 }
1067
1068 pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
1069 let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
1070 let resp = self.inner.cancel_creating_jobs(request).await?;
1071 Ok(resp.canceled_jobs)
1072 }
1073
1074 pub async fn list_table_fragments(
1075 &self,
1076 table_ids: &[u32],
1077 ) -> Result<HashMap<u32, TableFragmentInfo>> {
1078 let request = ListTableFragmentsRequest {
1079 table_ids: table_ids.to_vec(),
1080 };
1081 let resp = self.inner.list_table_fragments(request).await?;
1082 Ok(resp.table_fragments)
1083 }
1084
1085 pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1086 let resp = self
1087 .inner
1088 .list_streaming_job_states(ListStreamingJobStatesRequest {})
1089 .await?;
1090 Ok(resp.states)
1091 }
1092
1093 pub async fn list_fragment_distributions(&self) -> Result<Vec<FragmentDistribution>> {
1094 let resp = self
1095 .inner
1096 .list_fragment_distribution(ListFragmentDistributionRequest {})
1097 .await?;
1098 Ok(resp.distributions)
1099 }
1100
1101 pub async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
1102 let resp = self
1103 .inner
1104 .list_creating_fragment_distribution(ListCreatingFragmentDistributionRequest {})
1105 .await?;
1106 Ok(resp.distributions)
1107 }
1108
1109 pub async fn get_fragment_by_id(
1110 &self,
1111 fragment_id: u32,
1112 ) -> Result<Option<FragmentDistribution>> {
1113 let resp = self
1114 .inner
1115 .get_fragment_by_id(GetFragmentByIdRequest { fragment_id })
1116 .await?;
1117 Ok(resp.distribution)
1118 }
1119
1120 pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1121 let resp = self
1122 .inner
1123 .list_actor_states(ListActorStatesRequest {})
1124 .await?;
1125 Ok(resp.states)
1126 }
1127
1128 pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1129 let resp = self
1130 .inner
1131 .list_actor_splits(ListActorSplitsRequest {})
1132 .await?;
1133
1134 Ok(resp.actor_splits)
1135 }
1136
1137 pub async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
1138 let resp = self
1139 .inner
1140 .list_object_dependencies(ListObjectDependenciesRequest {})
1141 .await?;
1142 Ok(resp.dependencies)
1143 }
1144
1145 pub async fn pause(&self) -> Result<PauseResponse> {
1146 let request = PauseRequest {};
1147 let resp = self.inner.pause(request).await?;
1148 Ok(resp)
1149 }
1150
1151 pub async fn resume(&self) -> Result<ResumeResponse> {
1152 let request = ResumeRequest {};
1153 let resp = self.inner.resume(request).await?;
1154 Ok(resp)
1155 }
1156
1157 pub async fn apply_throttle(
1158 &self,
1159 kind: PbThrottleTarget,
1160 id: u32,
1161 rate: Option<u32>,
1162 ) -> Result<ApplyThrottleResponse> {
1163 let request = ApplyThrottleRequest {
1164 kind: kind as i32,
1165 id,
1166 rate,
1167 };
1168 let resp = self.inner.apply_throttle(request).await?;
1169 Ok(resp)
1170 }
1171
1172 pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1173 let resp = self
1174 .inner
1175 .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1176 .await?;
1177 Ok(resp.get_status().unwrap())
1178 }
1179
1180 pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1181 let request = GetClusterInfoRequest {};
1182 let resp = self.inner.get_cluster_info(request).await?;
1183 Ok(resp)
1184 }
1185
1186 pub async fn reschedule(
1187 &self,
1188 worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1189 revision: u64,
1190 resolve_no_shuffle_upstream: bool,
1191 ) -> Result<(bool, u64)> {
1192 let request = RescheduleRequest {
1193 revision,
1194 resolve_no_shuffle_upstream,
1195 worker_reschedules,
1196 };
1197 let resp = self.inner.reschedule(request).await?;
1198 Ok((resp.success, resp.revision))
1199 }
1200
1201 pub async fn risectl_get_pinned_versions_summary(
1202 &self,
1203 ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1204 let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1205 self.inner
1206 .rise_ctl_get_pinned_versions_summary(request)
1207 .await
1208 }
1209
1210 pub async fn risectl_get_checkpoint_hummock_version(
1211 &self,
1212 ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1213 let request = RiseCtlGetCheckpointVersionRequest {};
1214 self.inner.rise_ctl_get_checkpoint_version(request).await
1215 }
1216
1217 pub async fn risectl_pause_hummock_version_checkpoint(
1218 &self,
1219 ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1220 let request = RiseCtlPauseVersionCheckpointRequest {};
1221 self.inner.rise_ctl_pause_version_checkpoint(request).await
1222 }
1223
1224 pub async fn risectl_resume_hummock_version_checkpoint(
1225 &self,
1226 ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1227 let request = RiseCtlResumeVersionCheckpointRequest {};
1228 self.inner.rise_ctl_resume_version_checkpoint(request).await
1229 }
1230
1231 pub async fn init_metadata_for_replay(
1232 &self,
1233 tables: Vec<PbTable>,
1234 compaction_groups: Vec<CompactionGroupInfo>,
1235 ) -> Result<()> {
1236 let req = InitMetadataForReplayRequest {
1237 tables,
1238 compaction_groups,
1239 };
1240 let _resp = self.inner.init_metadata_for_replay(req).await?;
1241 Ok(())
1242 }
1243
1244 pub async fn replay_version_delta(
1245 &self,
1246 version_delta: HummockVersionDelta,
1247 ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1248 let req = ReplayVersionDeltaRequest {
1249 version_delta: Some(version_delta.into()),
1250 };
1251 let resp = self.inner.replay_version_delta(req).await?;
1252 Ok((
1253 HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1254 resp.modified_compaction_groups,
1255 ))
1256 }
1257
1258 pub async fn list_version_deltas(
1259 &self,
1260 start_id: HummockVersionId,
1261 num_limit: u32,
1262 committed_epoch_limit: HummockEpoch,
1263 ) -> Result<Vec<HummockVersionDelta>> {
1264 let req = ListVersionDeltasRequest {
1265 start_id: start_id.to_u64(),
1266 num_limit,
1267 committed_epoch_limit,
1268 };
1269 Ok(self
1270 .inner
1271 .list_version_deltas(req)
1272 .await?
1273 .version_deltas
1274 .unwrap()
1275 .version_deltas
1276 .iter()
1277 .map(HummockVersionDelta::from_rpc_protobuf)
1278 .collect())
1279 }
1280
1281 pub async fn trigger_compaction_deterministic(
1282 &self,
1283 version_id: HummockVersionId,
1284 compaction_groups: Vec<CompactionGroupId>,
1285 ) -> Result<()> {
1286 let req = TriggerCompactionDeterministicRequest {
1287 version_id: version_id.to_u64(),
1288 compaction_groups,
1289 };
1290 self.inner.trigger_compaction_deterministic(req).await?;
1291 Ok(())
1292 }
1293
1294 pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1295 let req = DisableCommitEpochRequest {};
1296 Ok(HummockVersion::from_rpc_protobuf(
1297 &self
1298 .inner
1299 .disable_commit_epoch(req)
1300 .await?
1301 .current_version
1302 .unwrap(),
1303 ))
1304 }
1305
1306 pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1307 let req = GetAssignedCompactTaskNumRequest {};
1308 let resp = self.inner.get_assigned_compact_task_num(req).await?;
1309 Ok(resp.num_tasks as usize)
1310 }
1311
1312 pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1313 let req = RiseCtlListCompactionGroupRequest {};
1314 let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1315 Ok(resp.compaction_groups)
1316 }
1317
1318 pub async fn risectl_update_compaction_config(
1319 &self,
1320 compaction_groups: &[CompactionGroupId],
1321 configs: &[MutableConfig],
1322 ) -> Result<()> {
1323 let req = RiseCtlUpdateCompactionConfigRequest {
1324 compaction_group_ids: compaction_groups.to_vec(),
1325 configs: configs
1326 .iter()
1327 .map(
1328 |c| rise_ctl_update_compaction_config_request::MutableConfig {
1329 mutable_config: Some(c.clone()),
1330 },
1331 )
1332 .collect(),
1333 };
1334 let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1335 Ok(())
1336 }
1337
1338 pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1339 let req = BackupMetaRequest { remarks };
1340 let resp = self.inner.backup_meta(req).await?;
1341 Ok(resp.job_id)
1342 }
1343
1344 pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1345 let req = GetBackupJobStatusRequest { job_id };
1346 let resp = self.inner.get_backup_job_status(req).await?;
1347 Ok((resp.job_status(), resp.message))
1348 }
1349
1350 pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1351 let req = DeleteMetaSnapshotRequest {
1352 snapshot_ids: snapshot_ids.to_vec(),
1353 };
1354 let _resp = self.inner.delete_meta_snapshot(req).await?;
1355 Ok(())
1356 }
1357
1358 pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1359 let req = GetMetaSnapshotManifestRequest {};
1360 let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1361 Ok(resp.manifest.expect("should exist"))
1362 }
1363
1364 pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1365 let req = GetTelemetryInfoRequest {};
1366 let resp = self.inner.get_telemetry_info(req).await?;
1367 Ok(resp)
1368 }
1369
1370 pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1371 let req = GetMetaStoreInfoRequest {};
1372 let resp = self.inner.get_meta_store_info(req).await?;
1373 Ok(resp.meta_store_endpoint)
1374 }
1375
1376 pub async fn alter_sink_props(
1377 &self,
1378 sink_id: u32,
1379 changed_props: BTreeMap<String, String>,
1380 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1381 connector_conn_ref: Option<u32>,
1382 ) -> Result<()> {
1383 let req = AlterConnectorPropsRequest {
1384 object_id: sink_id,
1385 changed_props: changed_props.into_iter().collect(),
1386 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1387 connector_conn_ref,
1388 object_type: AlterConnectorPropsObject::Sink as i32,
1389 };
1390 let _resp = self.inner.alter_connector_props(req).await?;
1391 Ok(())
1392 }
1393
1394 pub async fn alter_source_connector_props(
1395 &self,
1396 source_id: u32,
1397 changed_props: BTreeMap<String, String>,
1398 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1399 connector_conn_ref: Option<u32>,
1400 ) -> Result<()> {
1401 let req = AlterConnectorPropsRequest {
1402 object_id: source_id,
1403 changed_props: changed_props.into_iter().collect(),
1404 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1405 connector_conn_ref,
1406 object_type: AlterConnectorPropsObject::Source as i32,
1407 };
1408 let _resp = self.inner.alter_connector_props(req).await?;
1409 Ok(())
1410 }
1411
1412 pub async fn set_system_param(
1413 &self,
1414 param: String,
1415 value: Option<String>,
1416 ) -> Result<Option<SystemParamsReader>> {
1417 let req = SetSystemParamRequest { param, value };
1418 let resp = self.inner.set_system_param(req).await?;
1419 Ok(resp.params.map(SystemParamsReader::from))
1420 }
1421
1422 pub async fn get_session_params(&self) -> Result<String> {
1423 let req = GetSessionParamsRequest {};
1424 let resp = self.inner.get_session_params(req).await?;
1425 Ok(resp.params)
1426 }
1427
1428 pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1429 let req = SetSessionParamRequest { param, value };
1430 let resp = self.inner.set_session_param(req).await?;
1431 Ok(resp.param)
1432 }
1433
1434 pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1435 let req = GetDdlProgressRequest {};
1436 let resp = self.inner.get_ddl_progress(req).await?;
1437 Ok(resp.ddl_progress)
1438 }
1439
1440 pub async fn split_compaction_group(
1441 &self,
1442 group_id: CompactionGroupId,
1443 table_ids_to_new_group: &[StateTableId],
1444 partition_vnode_count: u32,
1445 ) -> Result<CompactionGroupId> {
1446 let req = SplitCompactionGroupRequest {
1447 group_id,
1448 table_ids: table_ids_to_new_group.to_vec(),
1449 partition_vnode_count,
1450 };
1451 let resp = self.inner.split_compaction_group(req).await?;
1452 Ok(resp.new_group_id)
1453 }
1454
1455 pub async fn get_tables(
1456 &self,
1457 table_ids: &[u32],
1458 include_dropped_tables: bool,
1459 ) -> Result<HashMap<u32, Table>> {
1460 let req = GetTablesRequest {
1461 table_ids: table_ids.to_vec(),
1462 include_dropped_tables,
1463 };
1464 let resp = self.inner.get_tables(req).await?;
1465 Ok(resp.tables)
1466 }
1467
1468 pub async fn list_serving_vnode_mappings(
1469 &self,
1470 ) -> Result<HashMap<u32, (u32, WorkerSlotMapping)>> {
1471 let req = GetServingVnodeMappingsRequest {};
1472 let resp = self.inner.get_serving_vnode_mappings(req).await?;
1473 let mappings = resp
1474 .worker_slot_mappings
1475 .into_iter()
1476 .map(|p| {
1477 (
1478 p.fragment_id,
1479 (
1480 resp.fragment_to_table
1481 .get(&p.fragment_id)
1482 .cloned()
1483 .unwrap_or(0),
1484 WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1485 ),
1486 )
1487 })
1488 .collect();
1489 Ok(mappings)
1490 }
1491
1492 pub async fn risectl_list_compaction_status(
1493 &self,
1494 ) -> Result<(
1495 Vec<CompactStatus>,
1496 Vec<CompactTaskAssignment>,
1497 Vec<CompactTaskProgress>,
1498 )> {
1499 let req = RiseCtlListCompactionStatusRequest {};
1500 let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1501 Ok((
1502 resp.compaction_statuses,
1503 resp.task_assignment,
1504 resp.task_progress,
1505 ))
1506 }
1507
1508 pub async fn get_compaction_score(
1509 &self,
1510 compaction_group_id: CompactionGroupId,
1511 ) -> Result<Vec<PickerInfo>> {
1512 let req = GetCompactionScoreRequest {
1513 compaction_group_id,
1514 };
1515 let resp = self.inner.get_compaction_score(req).await?;
1516 Ok(resp.scores)
1517 }
1518
1519 pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1520 let req = RiseCtlRebuildTableStatsRequest {};
1521 let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1522 Ok(())
1523 }
1524
1525 pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1526 let req = ListBranchedObjectRequest {};
1527 let resp = self.inner.list_branched_object(req).await?;
1528 Ok(resp.branched_objects)
1529 }
1530
1531 pub async fn list_active_write_limit(&self) -> Result<HashMap<u64, WriteLimit>> {
1532 let req = ListActiveWriteLimitRequest {};
1533 let resp = self.inner.list_active_write_limit(req).await?;
1534 Ok(resp.write_limits)
1535 }
1536
1537 pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1538 let req = ListHummockMetaConfigRequest {};
1539 let resp = self.inner.list_hummock_meta_config(req).await?;
1540 Ok(resp.configs)
1541 }
1542
1543 pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1544 let _resp = self
1545 .inner
1546 .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1547 .await?;
1548
1549 Ok(())
1550 }
1551
1552 pub async fn rw_cloud_validate_source(
1553 &self,
1554 source_type: SourceType,
1555 source_config: HashMap<String, String>,
1556 ) -> Result<RwCloudValidateSourceResponse> {
1557 let req = RwCloudValidateSourceRequest {
1558 source_type: source_type.into(),
1559 source_config,
1560 };
1561 let resp = self.inner.rw_cloud_validate_source(req).await?;
1562 Ok(resp)
1563 }
1564
1565 pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1566 self.inner.core.read().await.sink_coordinate_client.clone()
1567 }
1568
1569 pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1570 let req = ListCompactTaskAssignmentRequest {};
1571 let resp = self.inner.list_compact_task_assignment(req).await?;
1572 Ok(resp.task_assignment)
1573 }
1574
1575 pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1576 let req = ListEventLogRequest::default();
1577 let resp = self.inner.list_event_log(req).await?;
1578 Ok(resp.event_logs)
1579 }
1580
1581 pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1582 let req = ListCompactTaskProgressRequest {};
1583 let resp = self.inner.list_compact_task_progress(req).await?;
1584 Ok(resp.task_progress)
1585 }
1586
1587 #[cfg(madsim)]
1588 pub fn try_add_panic_event_blocking(
1589 &self,
1590 panic_info: impl Display,
1591 timeout_millis: Option<u64>,
1592 ) {
1593 }
1594
1595 #[cfg(not(madsim))]
1597 pub fn try_add_panic_event_blocking(
1598 &self,
1599 panic_info: impl Display,
1600 timeout_millis: Option<u64>,
1601 ) {
1602 let event = event_log::EventWorkerNodePanic {
1603 worker_id: self.worker_id,
1604 worker_type: self.worker_type.into(),
1605 host_addr: Some(self.host_addr.to_protobuf()),
1606 panic_info: format!("{panic_info}"),
1607 };
1608 let grpc_meta_client = self.inner.clone();
1609 let _ = thread::spawn(move || {
1610 let rt = tokio::runtime::Builder::new_current_thread()
1611 .enable_all()
1612 .build()
1613 .unwrap();
1614 let req = AddEventLogRequest {
1615 event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1616 };
1617 rt.block_on(async {
1618 let _ = tokio::time::timeout(
1619 Duration::from_millis(timeout_millis.unwrap_or(1000)),
1620 grpc_meta_client.add_event_log(req),
1621 )
1622 .await;
1623 });
1624 })
1625 .join();
1626 }
1627
1628 pub async fn add_sink_fail_evet(
1629 &self,
1630 sink_id: u32,
1631 sink_name: String,
1632 connector: String,
1633 error: String,
1634 ) -> Result<()> {
1635 let event = event_log::EventSinkFail {
1636 sink_id,
1637 sink_name,
1638 connector,
1639 error,
1640 };
1641 let req = AddEventLogRequest {
1642 event: Some(add_event_log_request::Event::SinkFail(event)),
1643 };
1644 self.inner.add_event_log(req).await?;
1645 Ok(())
1646 }
1647
1648 pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1649 let req = CancelCompactTaskRequest {
1650 task_id,
1651 task_status: task_status as _,
1652 };
1653 let resp = self.inner.cancel_compact_task(req).await?;
1654 Ok(resp.ret)
1655 }
1656
1657 pub async fn get_version_by_epoch(
1658 &self,
1659 epoch: HummockEpoch,
1660 table_id: u32,
1661 ) -> Result<PbHummockVersion> {
1662 let req = GetVersionByEpochRequest { epoch, table_id };
1663 let resp = self.inner.get_version_by_epoch(req).await?;
1664 Ok(resp.version.unwrap())
1665 }
1666
1667 pub async fn get_cluster_limits(
1668 &self,
1669 ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1670 let req = GetClusterLimitsRequest {};
1671 let resp = self.inner.get_cluster_limits(req).await?;
1672 Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1673 }
1674
1675 pub async fn merge_compaction_group(
1676 &self,
1677 left_group_id: CompactionGroupId,
1678 right_group_id: CompactionGroupId,
1679 ) -> Result<()> {
1680 let req = MergeCompactionGroupRequest {
1681 left_group_id,
1682 right_group_id,
1683 };
1684 self.inner.merge_compaction_group(req).await?;
1685 Ok(())
1686 }
1687
1688 pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1690 let request = ListRateLimitsRequest {};
1691 let resp = self.inner.list_rate_limits(request).await?;
1692 Ok(resp.rate_limits)
1693 }
1694
1695 pub async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
1696 let request = ListIcebergTablesRequest {};
1697 let resp = self.inner.list_iceberg_tables(request).await?;
1698 Ok(resp.iceberg_tables)
1699 }
1700
1701 pub async fn get_cluster_stack_trace(
1703 &self,
1704 actor_traces_format: ActorTracesFormat,
1705 ) -> Result<StackTraceResponse> {
1706 let request = StackTraceRequest {
1707 actor_traces_format: actor_traces_format as i32,
1708 };
1709 let resp = self.inner.stack_trace(request).await?;
1710 Ok(resp)
1711 }
1712
1713 pub async fn set_sync_log_store_aligned(&self, job_id: u32, aligned: bool) -> Result<()> {
1714 let request = SetSyncLogStoreAlignedRequest { job_id, aligned };
1715 self.inner.set_sync_log_store_aligned(request).await?;
1716 Ok(())
1717 }
1718}
1719
1720#[async_trait]
1721impl HummockMetaClient for MetaClient {
1722 async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
1723 let req = UnpinVersionBeforeRequest {
1724 context_id: self.worker_id(),
1725 unpin_version_before: unpin_version_before.to_u64(),
1726 };
1727 self.inner.unpin_version_before(req).await?;
1728 Ok(())
1729 }
1730
1731 async fn get_current_version(&self) -> Result<HummockVersion> {
1732 let req = GetCurrentVersionRequest::default();
1733 Ok(HummockVersion::from_rpc_protobuf(
1734 &self
1735 .inner
1736 .get_current_version(req)
1737 .await?
1738 .current_version
1739 .unwrap(),
1740 ))
1741 }
1742
1743 async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
1744 let resp = self
1745 .inner
1746 .get_new_object_ids(GetNewObjectIdsRequest { number })
1747 .await?;
1748 Ok(ObjectIdRange::new(resp.start_id, resp.end_id))
1749 }
1750
1751 async fn commit_epoch_with_change_log(
1752 &self,
1753 _epoch: HummockEpoch,
1754 _sync_result: SyncResult,
1755 _change_log_info: Option<HummockMetaClientChangeLogInfo>,
1756 ) -> Result<()> {
1757 panic!("Only meta service can commit_epoch in production.")
1758 }
1759
1760 async fn trigger_manual_compaction(
1761 &self,
1762 compaction_group_id: u64,
1763 table_id: u32,
1764 level: u32,
1765 sst_ids: Vec<u64>,
1766 ) -> Result<()> {
1767 let req = TriggerManualCompactionRequest {
1769 compaction_group_id,
1770 table_id,
1771 level,
1774 sst_ids,
1775 ..Default::default()
1776 };
1777
1778 self.inner.trigger_manual_compaction(req).await?;
1779 Ok(())
1780 }
1781
1782 async fn trigger_full_gc(
1783 &self,
1784 sst_retention_time_sec: u64,
1785 prefix: Option<String>,
1786 ) -> Result<()> {
1787 self.inner
1788 .trigger_full_gc(TriggerFullGcRequest {
1789 sst_retention_time_sec,
1790 prefix,
1791 })
1792 .await?;
1793 Ok(())
1794 }
1795
1796 async fn subscribe_compaction_event(
1797 &self,
1798 ) -> Result<(
1799 UnboundedSender<SubscribeCompactionEventRequest>,
1800 BoxStream<'static, CompactionEventItem>,
1801 )> {
1802 let (request_sender, request_receiver) =
1803 unbounded_channel::<SubscribeCompactionEventRequest>();
1804 request_sender
1805 .send(SubscribeCompactionEventRequest {
1806 event: Some(subscribe_compaction_event_request::Event::Register(
1807 Register {
1808 context_id: self.worker_id(),
1809 },
1810 )),
1811 create_at: SystemTime::now()
1812 .duration_since(SystemTime::UNIX_EPOCH)
1813 .expect("Clock may have gone backwards")
1814 .as_millis() as u64,
1815 })
1816 .context("Failed to subscribe compaction event")?;
1817
1818 let stream = self
1819 .inner
1820 .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
1821 request_receiver,
1822 )))
1823 .await?;
1824
1825 Ok((request_sender, Box::pin(stream)))
1826 }
1827
1828 async fn get_version_by_epoch(
1829 &self,
1830 epoch: HummockEpoch,
1831 table_id: u32,
1832 ) -> Result<PbHummockVersion> {
1833 self.get_version_by_epoch(epoch, table_id).await
1834 }
1835
1836 async fn subscribe_iceberg_compaction_event(
1837 &self,
1838 ) -> Result<(
1839 UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1840 BoxStream<'static, IcebergCompactionEventItem>,
1841 )> {
1842 let (request_sender, request_receiver) =
1843 unbounded_channel::<SubscribeIcebergCompactionEventRequest>();
1844 request_sender
1845 .send(SubscribeIcebergCompactionEventRequest {
1846 event: Some(subscribe_iceberg_compaction_event_request::Event::Register(
1847 IcebergRegister {
1848 context_id: self.worker_id(),
1849 },
1850 )),
1851 create_at: SystemTime::now()
1852 .duration_since(SystemTime::UNIX_EPOCH)
1853 .expect("Clock may have gone backwards")
1854 .as_millis() as u64,
1855 })
1856 .context("Failed to subscribe compaction event")?;
1857
1858 let stream = self
1859 .inner
1860 .subscribe_iceberg_compaction_event(Request::new(UnboundedReceiverStream::new(
1861 request_receiver,
1862 )))
1863 .await?;
1864
1865 Ok((request_sender, Box::pin(stream)))
1866 }
1867}
1868
1869#[async_trait]
1870impl TelemetryInfoFetcher for MetaClient {
1871 async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
1872 let resp = self
1873 .get_telemetry_info()
1874 .await
1875 .map_err(|e| e.to_report_string())?;
1876 let tracking_id = resp.get_tracking_id().ok();
1877 Ok(tracking_id.map(|id| id.to_owned()))
1878 }
1879}
1880
1881pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
1882
1883#[derive(Debug, Clone)]
1884struct GrpcMetaClientCore {
1885 cluster_client: ClusterServiceClient<Channel>,
1886 meta_member_client: MetaMemberServiceClient<Channel>,
1887 heartbeat_client: HeartbeatServiceClient<Channel>,
1888 ddl_client: DdlServiceClient<Channel>,
1889 hummock_client: HummockManagerServiceClient<Channel>,
1890 notification_client: NotificationServiceClient<Channel>,
1891 stream_client: StreamManagerServiceClient<Channel>,
1892 user_client: UserServiceClient<Channel>,
1893 scale_client: ScaleServiceClient<Channel>,
1894 backup_client: BackupServiceClient<Channel>,
1895 telemetry_client: TelemetryInfoServiceClient<Channel>,
1896 system_params_client: SystemParamsServiceClient<Channel>,
1897 session_params_client: SessionParamServiceClient<Channel>,
1898 serving_client: ServingServiceClient<Channel>,
1899 cloud_client: CloudServiceClient<Channel>,
1900 sink_coordinate_client: SinkCoordinationRpcClient,
1901 event_log_client: EventLogServiceClient<Channel>,
1902 cluster_limit_client: ClusterLimitServiceClient<Channel>,
1903 hosted_iceberg_catalog_service_client: HostedIcebergCatalogServiceClient<Channel>,
1904 monitor_client: MonitorServiceClient<Channel>,
1905}
1906
1907impl GrpcMetaClientCore {
1908 pub(crate) fn new(channel: Channel) -> Self {
1909 let cluster_client = ClusterServiceClient::new(channel.clone());
1910 let meta_member_client = MetaMemberClient::new(channel.clone());
1911 let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
1912 let ddl_client =
1913 DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1914 let hummock_client =
1915 HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1916 let notification_client =
1917 NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1918 let stream_client =
1919 StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1920 let user_client = UserServiceClient::new(channel.clone());
1921 let scale_client =
1922 ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1923 let backup_client = BackupServiceClient::new(channel.clone());
1924 let telemetry_client =
1925 TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1926 let system_params_client = SystemParamsServiceClient::new(channel.clone());
1927 let session_params_client = SessionParamServiceClient::new(channel.clone());
1928 let serving_client = ServingServiceClient::new(channel.clone());
1929 let cloud_client = CloudServiceClient::new(channel.clone());
1930 let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone());
1931 let event_log_client = EventLogServiceClient::new(channel.clone());
1932 let cluster_limit_client = ClusterLimitServiceClient::new(channel.clone());
1933 let hosted_iceberg_catalog_service_client =
1934 HostedIcebergCatalogServiceClient::new(channel.clone());
1935 let monitor_client = MonitorServiceClient::new(channel);
1936
1937 GrpcMetaClientCore {
1938 cluster_client,
1939 meta_member_client,
1940 heartbeat_client,
1941 ddl_client,
1942 hummock_client,
1943 notification_client,
1944 stream_client,
1945 user_client,
1946 scale_client,
1947 backup_client,
1948 telemetry_client,
1949 system_params_client,
1950 session_params_client,
1951 serving_client,
1952 cloud_client,
1953 sink_coordinate_client,
1954 event_log_client,
1955 cluster_limit_client,
1956 hosted_iceberg_catalog_service_client,
1957 monitor_client,
1958 }
1959 }
1960}
1961
1962#[derive(Debug, Clone)]
1966struct GrpcMetaClient {
1967 member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
1968 core: Arc<RwLock<GrpcMetaClientCore>>,
1969}
1970
1971type MetaMemberClient = MetaMemberServiceClient<Channel>;
1972
1973struct MetaMemberGroup {
1974 members: LruCache<http::Uri, Option<MetaMemberClient>>,
1975}
1976
1977struct MetaMemberManagement {
1978 core_ref: Arc<RwLock<GrpcMetaClientCore>>,
1979 members: Either<MetaMemberClient, MetaMemberGroup>,
1980 current_leader: http::Uri,
1981 meta_config: MetaConfig,
1982}
1983
1984impl MetaMemberManagement {
1985 const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
1986
1987 fn host_address_to_uri(addr: HostAddress) -> http::Uri {
1988 format!("http://{}:{}", addr.host, addr.port)
1989 .parse()
1990 .unwrap()
1991 }
1992
1993 async fn recreate_core(&self, channel: Channel) {
1994 let mut core = self.core_ref.write().await;
1995 *core = GrpcMetaClientCore::new(channel);
1996 }
1997
1998 async fn refresh_members(&mut self) -> Result<()> {
1999 let leader_addr = match self.members.as_mut() {
2000 Either::Left(client) => {
2001 let resp = client
2002 .to_owned()
2003 .members(MembersRequest {})
2004 .await
2005 .map_err(RpcError::from_meta_status)?;
2006 let resp = resp.into_inner();
2007 resp.members.into_iter().find(|member| member.is_leader)
2008 }
2009 Either::Right(member_group) => {
2010 let mut fetched_members = None;
2011
2012 for (addr, client) in &mut member_group.members {
2013 let members: Result<_> = try {
2014 let mut client = match client {
2015 Some(cached_client) => cached_client.to_owned(),
2016 None => {
2017 let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
2018 let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
2019 .await
2020 .context("failed to create client")?;
2021 let new_client: MetaMemberClient =
2022 MetaMemberServiceClient::new(channel);
2023 *client = Some(new_client.clone());
2024
2025 new_client
2026 }
2027 };
2028
2029 let resp = client
2030 .members(MembersRequest {})
2031 .await
2032 .context("failed to fetch members")?;
2033
2034 resp.into_inner().members
2035 };
2036
2037 let fetched = members.is_ok();
2038 fetched_members = Some(members);
2039 if fetched {
2040 break;
2041 }
2042 }
2043
2044 let members = fetched_members
2045 .context("no member available in the list")?
2046 .context("could not refresh members")?;
2047
2048 let mut leader = None;
2050 for member in members {
2051 if member.is_leader {
2052 leader = Some(member.clone());
2053 }
2054
2055 let addr = Self::host_address_to_uri(member.address.unwrap());
2056 if !member_group.members.contains(&addr) {
2058 tracing::info!("new meta member joined: {}", addr);
2059 member_group.members.put(addr, None);
2060 }
2061 }
2062
2063 leader
2064 }
2065 };
2066
2067 if let Some(leader) = leader_addr {
2068 let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
2069
2070 if discovered_leader != self.current_leader {
2071 tracing::info!("new meta leader {} discovered", discovered_leader);
2072
2073 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
2074 Duration::from_secs(self.meta_config.meta_leader_lease_secs),
2075 false,
2076 );
2077
2078 let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
2079 let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
2080 GrpcMetaClient::connect_to_endpoint(endpoint).await
2081 })
2082 .await?;
2083
2084 self.recreate_core(channel).await;
2085 self.current_leader = discovered_leader;
2086 }
2087 }
2088
2089 Ok(())
2090 }
2091}
2092
2093impl GrpcMetaClient {
2094 const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
2096 const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
2098 const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
2100 const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
2102
2103 fn start_meta_member_monitor(
2104 &self,
2105 init_leader_addr: http::Uri,
2106 members: Either<MetaMemberClient, MetaMemberGroup>,
2107 force_refresh_receiver: Receiver<Sender<Result<()>>>,
2108 meta_config: MetaConfig,
2109 ) -> Result<()> {
2110 let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
2111 let current_leader = init_leader_addr;
2112
2113 let enable_period_tick = matches!(members, Either::Right(_));
2114
2115 let member_management = MetaMemberManagement {
2116 core_ref,
2117 members,
2118 current_leader,
2119 meta_config,
2120 };
2121
2122 let mut force_refresh_receiver = force_refresh_receiver;
2123
2124 tokio::spawn(async move {
2125 let mut member_management = member_management;
2126 let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
2127
2128 loop {
2129 let event: Option<Sender<Result<()>>> = if enable_period_tick {
2130 tokio::select! {
2131 _ = ticker.tick() => None,
2132 result_sender = force_refresh_receiver.recv() => {
2133 if result_sender.is_none() {
2134 break;
2135 }
2136
2137 result_sender
2138 },
2139 }
2140 } else {
2141 let result_sender = force_refresh_receiver.recv().await;
2142
2143 if result_sender.is_none() {
2144 break;
2145 }
2146
2147 result_sender
2148 };
2149
2150 let tick_result = member_management.refresh_members().await;
2151 if let Err(e) = tick_result.as_ref() {
2152 tracing::warn!(error = %e.as_report(), "refresh meta member client failed");
2153 }
2154
2155 if let Some(sender) = event {
2156 let _resp = sender.send(tick_result);
2158 }
2159 }
2160 });
2161
2162 Ok(())
2163 }
2164
2165 async fn force_refresh_leader(&self) -> Result<()> {
2166 let (sender, receiver) = oneshot::channel();
2167
2168 self.member_monitor_event_sender
2169 .send(sender)
2170 .await
2171 .map_err(|e| anyhow!(e))?;
2172
2173 receiver.await.map_err(|e| anyhow!(e))?
2174 }
2175
2176 pub async fn new(strategy: &MetaAddressStrategy, config: MetaConfig) -> Result<Self> {
2178 let (channel, addr) = match strategy {
2179 MetaAddressStrategy::LoadBalance(addr) => {
2180 Self::try_build_rpc_channel(vec![addr.clone()]).await
2181 }
2182 MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
2183 }?;
2184 let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
2185 let client = GrpcMetaClient {
2186 member_monitor_event_sender: force_refresh_sender,
2187 core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
2188 };
2189
2190 let meta_member_client = client.core.read().await.meta_member_client.clone();
2191 let members = match strategy {
2192 MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
2193 MetaAddressStrategy::List(addrs) => {
2194 let mut members = LruCache::new(20);
2195 for addr in addrs {
2196 members.put(addr.clone(), None);
2197 }
2198 members.put(addr.clone(), Some(meta_member_client));
2199
2200 Either::Right(MetaMemberGroup { members })
2201 }
2202 };
2203
2204 client.start_meta_member_monitor(addr, members, force_refresh_receiver, config)?;
2205
2206 client.force_refresh_leader().await?;
2207
2208 Ok(client)
2209 }
2210
2211 fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2212 Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2213 }
2214
2215 pub(crate) async fn try_build_rpc_channel(
2216 addrs: impl IntoIterator<Item = http::Uri>,
2217 ) -> Result<(Channel, http::Uri)> {
2218 let endpoints: Vec<_> = addrs
2219 .into_iter()
2220 .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2221 .collect();
2222
2223 let mut last_error = None;
2224
2225 for (endpoint, addr) in endpoints {
2226 match Self::connect_to_endpoint(endpoint).await {
2227 Ok(channel) => {
2228 tracing::info!("Connect to meta server {} successfully", addr);
2229 return Ok((channel, addr));
2230 }
2231 Err(e) => {
2232 tracing::warn!(
2233 error = %e.as_report(),
2234 "Failed to connect to meta server {}, trying again",
2235 addr,
2236 );
2237 last_error = Some(e);
2238 }
2239 }
2240 }
2241
2242 if let Some(last_error) = last_error {
2243 Err(anyhow::anyhow!(last_error)
2244 .context("failed to connect to all meta servers")
2245 .into())
2246 } else {
2247 bail!("no meta server address provided")
2248 }
2249 }
2250
2251 async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2252 let channel = endpoint
2253 .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2254 .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2255 .connect_timeout(Duration::from_secs(5))
2256 .monitored_connect("grpc-meta-client", Default::default())
2257 .await?
2258 .wrapped();
2259
2260 Ok(channel)
2261 }
2262
2263 pub(crate) fn retry_strategy_to_bound(
2264 high_bound: Duration,
2265 exceed: bool,
2266 ) -> impl Iterator<Item = Duration> {
2267 let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2268 .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2269 .map(jitter);
2270
2271 let mut sum = Duration::default();
2272
2273 iter.take_while(move |duration| {
2274 sum += *duration;
2275
2276 if exceed {
2277 sum < high_bound + *duration
2278 } else {
2279 sum < high_bound
2280 }
2281 })
2282 }
2283}
2284
2285macro_rules! for_all_meta_rpc {
2286 ($macro:ident) => {
2287 $macro! {
2288 { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2289 ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2290 ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2291 ,{ cluster_client, update_worker_node_schedulability, UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse }
2292 ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2293 ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2294 ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2295 ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2296 ,{ stream_client, flush, FlushRequest, FlushResponse }
2297 ,{ stream_client, pause, PauseRequest, PauseResponse }
2298 ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2299 ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2300 ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2301 ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2302 ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2303 ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2304 ,{ stream_client, list_creating_fragment_distribution, ListCreatingFragmentDistributionRequest, ListCreatingFragmentDistributionResponse }
2305 ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2306 ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2307 ,{ stream_client, list_object_dependencies, ListObjectDependenciesRequest, ListObjectDependenciesResponse }
2308 ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2309 ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2310 ,{ stream_client, alter_connector_props, AlterConnectorPropsRequest, AlterConnectorPropsResponse }
2311 ,{ stream_client, get_fragment_by_id, GetFragmentByIdRequest, GetFragmentByIdResponse }
2312 ,{ stream_client, set_sync_log_store_aligned, SetSyncLogStoreAlignedRequest, SetSyncLogStoreAlignedResponse }
2313 ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2314 ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2315 ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2316 ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2317 ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2318 ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2319 ,{ ddl_client, alter_database_param, AlterDatabaseParamRequest, AlterDatabaseParamResponse }
2320 ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2321 ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2322 ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2323 ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2324 ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2325 ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2326 ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2327 ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2328 ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2329 ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2330 ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2331 ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2332 ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2333 ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2334 ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2335 ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2336 ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2337 ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2338 ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2339 ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2340 ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2341 ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2342 ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2343 ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2344 ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2345 ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2346 ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2347 ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2348 ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2349 ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2350 ,{ ddl_client, wait, WaitRequest, WaitResponse }
2351 ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2352 ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2353 ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2354 ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2355 ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2356 ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2357 ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2358 ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2359 ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2360 ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2361 ,{ hummock_client, get_new_object_ids, GetNewObjectIdsRequest, GetNewObjectIdsResponse }
2362 ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2363 ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2364 ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2365 ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2366 ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2367 ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2368 ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2369 ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2370 ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2371 ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2372 ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2373 ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2374 ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2375 ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2376 ,{ hummock_client, subscribe_iceberg_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeIcebergCompactionEventRequest>, Streaming<SubscribeIcebergCompactionEventResponse> }
2377 ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2378 ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2379 ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2380 ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2381 ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2382 ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2383 ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2384 ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2385 ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2386 ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2387 ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2388 ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2389 ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2390 ,{ user_client, alter_default_privilege, AlterDefaultPrivilegeRequest, AlterDefaultPrivilegeResponse }
2391 ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2392 ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2393 ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2394 ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2395 ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2396 ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2397 ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2398 ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2399 ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2400 ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2401 ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2402 ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2403 ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2404 ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2405 ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2406 ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2407 ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2408 ,{ hosted_iceberg_catalog_service_client, list_iceberg_tables, ListIcebergTablesRequest, ListIcebergTablesResponse }
2409 ,{ monitor_client, stack_trace, StackTraceRequest, StackTraceResponse }
2410 }
2411 };
2412}
2413
2414impl GrpcMetaClient {
2415 async fn refresh_client_if_needed(&self, code: Code) {
2416 if matches!(
2417 code,
2418 Code::Unknown | Code::Unimplemented | Code::Unavailable
2419 ) {
2420 tracing::debug!("matching tonic code {}", code);
2421 let (result_sender, result_receiver) = oneshot::channel();
2422 if self
2423 .member_monitor_event_sender
2424 .try_send(result_sender)
2425 .is_ok()
2426 {
2427 if let Ok(Err(e)) = result_receiver.await {
2428 tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2429 }
2430 } else {
2431 tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2432 }
2433 }
2434 }
2435}
2436
2437impl GrpcMetaClient {
2438 for_all_meta_rpc! { meta_rpc_client_method_impl }
2439}