1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{
33 AlterDatabaseParam, FunctionId, IndexId, ObjectId, SecretId, TableId,
34};
35use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
36use risingwave_common::hash::WorkerSlotMapping;
37use risingwave_common::id::{
38 ConnectionId, DatabaseId, JobId, SchemaId, SinkId, SubscriptionId, UserId, ViewId, WorkerId,
39};
40use risingwave_common::monitor::EndpointExt;
41use risingwave_common::system_param::reader::SystemParamsReader;
42use risingwave_common::telemetry::report::TelemetryInfoFetcher;
43use risingwave_common::util::addr::HostAddr;
44use risingwave_common::util::meta_addr::MetaAddressStrategy;
45use risingwave_common::util::resource_util::cpu::total_cpu_available;
46use risingwave_common::util::resource_util::hostname;
47use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
48use risingwave_error::bail;
49use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
50use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
51use risingwave_hummock_sdk::{
52 CompactionGroupId, HummockEpoch, HummockVersionId, ObjectIdRange, SyncResult,
53};
54use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
55use risingwave_pb::backup_service::*;
56use risingwave_pb::catalog::{
57 Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
58 PbSubscription, PbTable, PbView, Table,
59};
60use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
61use risingwave_pb::cloud_service::*;
62use risingwave_pb::common::worker_node::Property;
63use risingwave_pb::common::{HostAddress, OptionalUint32, OptionalUint64, WorkerNode, WorkerType};
64use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
65use risingwave_pb::ddl_service::alter_owner_request::Object;
66use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
67use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
68use risingwave_pb::ddl_service::*;
69use risingwave_pb::hummock::compact_task::TaskStatus;
70use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
71use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
72use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
73use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
74use risingwave_pb::hummock::write_limits::WriteLimit;
75use risingwave_pb::hummock::*;
76use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Register as IcebergRegister;
77use risingwave_pb::iceberg_compaction::{
78 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
79 subscribe_iceberg_compaction_event_request,
80};
81use risingwave_pb::id::{ActorId, FragmentId, HummockSstableId, SourceId};
82use risingwave_pb::meta::alter_connector_props_request::{
83 AlterConnectorPropsObject, AlterIcebergTableIds, ExtraOptions,
84};
85use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
86use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
87use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
88use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
89use risingwave_pb::meta::hosted_iceberg_catalog_service_client::HostedIcebergCatalogServiceClient;
90use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
91use risingwave_pb::meta::list_actor_states_response::ActorState;
92use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
93use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
94use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
95use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
96use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
97use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
98use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
99use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
100use risingwave_pb::meta::serving_service_client::ServingServiceClient;
101use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
102use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
103use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
104use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
105use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
106use risingwave_pb::meta::{FragmentDistribution, *};
107use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
108use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
109use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
110use risingwave_pb::secret::PbSecretRef;
111use risingwave_pb::stream_plan::StreamFragmentGraph;
112use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
113use risingwave_pb::user::update_user_request::UpdateField;
114use risingwave_pb::user::user_service_client::UserServiceClient;
115use risingwave_pb::user::*;
116use thiserror_ext::AsReport;
117use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
118use tokio::sync::oneshot::Sender;
119use tokio::sync::{RwLock, mpsc, oneshot};
120use tokio::task::JoinHandle;
121use tokio::time::{self};
122use tokio_retry::strategy::{ExponentialBackoff, jitter};
123use tokio_stream::wrappers::UnboundedReceiverStream;
124use tonic::transport::Endpoint;
125use tonic::{Code, Request, Streaming};
126
127use crate::channel::{Channel, WrappedChannelExt};
128use crate::error::{Result, RpcError};
129use crate::hummock_meta_client::{
130 CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
131 IcebergCompactionEventItem,
132};
133use crate::meta_rpc_client_method_impl;
134
135#[derive(Clone, Debug)]
137pub struct MetaClient {
138 worker_id: WorkerId,
139 worker_type: WorkerType,
140 host_addr: HostAddr,
141 inner: GrpcMetaClient,
142 meta_config: Arc<MetaConfig>,
143 cluster_id: String,
144 shutting_down: Arc<AtomicBool>,
145}
146
147impl MetaClient {
148 pub fn worker_id(&self) -> WorkerId {
149 self.worker_id
150 }
151
152 pub fn host_addr(&self) -> &HostAddr {
153 &self.host_addr
154 }
155
156 pub fn worker_type(&self) -> WorkerType {
157 self.worker_type
158 }
159
160 pub fn cluster_id(&self) -> &str {
161 &self.cluster_id
162 }
163
164 pub async fn subscribe(
166 &self,
167 subscribe_type: SubscribeType,
168 ) -> Result<Streaming<SubscribeResponse>> {
169 let request = SubscribeRequest {
170 subscribe_type: subscribe_type as i32,
171 host: Some(self.host_addr.to_protobuf()),
172 worker_id: self.worker_id(),
173 };
174
175 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
176 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
177 true,
178 );
179
180 tokio_retry::Retry::spawn(retry_strategy, || async {
181 let request = request.clone();
182 self.inner.subscribe(request).await
183 })
184 .await
185 }
186
187 pub async fn create_connection(
188 &self,
189 connection_name: String,
190 database_id: DatabaseId,
191 schema_id: SchemaId,
192 owner_id: UserId,
193 req: create_connection_request::Payload,
194 ) -> Result<WaitVersion> {
195 let request = CreateConnectionRequest {
196 name: connection_name,
197 database_id,
198 schema_id,
199 owner_id,
200 payload: Some(req),
201 };
202 let resp = self.inner.create_connection(request).await?;
203 Ok(resp
204 .version
205 .ok_or_else(|| anyhow!("wait version not set"))?)
206 }
207
208 pub async fn create_secret(
209 &self,
210 secret_name: String,
211 database_id: DatabaseId,
212 schema_id: SchemaId,
213 owner_id: UserId,
214 value: Vec<u8>,
215 ) -> Result<WaitVersion> {
216 let request = CreateSecretRequest {
217 name: secret_name,
218 database_id,
219 schema_id,
220 owner_id,
221 value,
222 };
223 let resp = self.inner.create_secret(request).await?;
224 Ok(resp
225 .version
226 .ok_or_else(|| anyhow!("wait version not set"))?)
227 }
228
229 pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
230 let request = ListConnectionsRequest {};
231 let resp = self.inner.list_connections(request).await?;
232 Ok(resp.connections)
233 }
234
235 pub async fn drop_connection(
236 &self,
237 connection_id: ConnectionId,
238 cascade: bool,
239 ) -> Result<WaitVersion> {
240 let request = DropConnectionRequest {
241 connection_id,
242 cascade,
243 };
244 let resp = self.inner.drop_connection(request).await?;
245 Ok(resp
246 .version
247 .ok_or_else(|| anyhow!("wait version not set"))?)
248 }
249
250 pub async fn drop_secret(&self, secret_id: SecretId, cascade: bool) -> Result<WaitVersion> {
251 let request = DropSecretRequest { secret_id, cascade };
252 let resp = self.inner.drop_secret(request).await?;
253 Ok(resp
254 .version
255 .ok_or_else(|| anyhow!("wait version not set"))?)
256 }
257
258 pub async fn register_new(
262 addr_strategy: MetaAddressStrategy,
263 worker_type: WorkerType,
264 addr: &HostAddr,
265 property: Property,
266 meta_config: Arc<MetaConfig>,
267 ) -> (Self, SystemParamsReader) {
268 let ret =
269 Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
270
271 match ret {
272 Ok(ret) => ret,
273 Err(err) => {
274 tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
275 std::process::exit(1);
276 }
277 }
278 }
279
280 async fn register_new_inner(
281 addr_strategy: MetaAddressStrategy,
282 worker_type: WorkerType,
283 addr: &HostAddr,
284 property: Property,
285 meta_config: Arc<MetaConfig>,
286 ) -> Result<(Self, SystemParamsReader)> {
287 tracing::info!("register meta client using strategy: {}", addr_strategy);
288
289 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
291 Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
292 true,
293 );
294
295 if property.is_unschedulable {
296 tracing::warn!("worker {:?} registered as unschedulable", addr.clone());
297 }
298 let init_result: Result<_> = tokio_retry::RetryIf::spawn(
299 retry_strategy,
300 || async {
301 let grpc_meta_client =
302 GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
303
304 let add_worker_resp = grpc_meta_client
305 .add_worker_node(AddWorkerNodeRequest {
306 worker_type: worker_type as i32,
307 host: Some(addr.to_protobuf()),
308 property: Some(property.clone()),
309 resource: Some(risingwave_pb::common::worker_node::Resource {
310 rw_version: RW_VERSION.to_owned(),
311 total_memory_bytes: system_memory_available_bytes() as _,
312 total_cpu_cores: total_cpu_available() as _,
313 hostname: hostname(),
314 }),
315 })
316 .await
317 .context("failed to add worker node")?;
318
319 let system_params_resp = grpc_meta_client
320 .get_system_params(GetSystemParamsRequest {})
321 .await
322 .context("failed to get initial system params")?;
323
324 Ok((add_worker_resp, system_params_resp, grpc_meta_client))
325 },
326 |e: &RpcError| !e.is_from_tonic_server_impl(),
329 )
330 .await;
331
332 let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
333 let worker_id = add_worker_resp
334 .node_id
335 .expect("AddWorkerNodeResponse::node_id is empty");
336
337 let meta_client = Self {
338 worker_id,
339 worker_type,
340 host_addr: addr.clone(),
341 inner: grpc_meta_client,
342 meta_config: meta_config.clone(),
343 cluster_id: add_worker_resp.cluster_id,
344 shutting_down: Arc::new(false.into()),
345 };
346
347 static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
348 REPORT_PANIC.call_once(|| {
349 let meta_client_clone = meta_client.clone();
350 std::panic::update_hook(move |default_hook, info| {
351 meta_client_clone.try_add_panic_event_blocking(info, None);
353 default_hook(info);
354 });
355 });
356
357 Ok((meta_client, system_params_resp.params.unwrap().into()))
358 }
359
360 pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
362 let request = ActivateWorkerNodeRequest {
363 host: Some(addr.to_protobuf()),
364 node_id: self.worker_id,
365 };
366 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
367 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
368 true,
369 );
370 tokio_retry::Retry::spawn(retry_strategy, || async {
371 let request = request.clone();
372 self.inner.activate_worker_node(request).await
373 })
374 .await?;
375
376 Ok(())
377 }
378
379 pub async fn send_heartbeat(&self) -> Result<()> {
381 let request = HeartbeatRequest {
382 node_id: self.worker_id,
383 resource: Some(risingwave_pb::common::worker_node::Resource {
384 rw_version: RW_VERSION.to_owned(),
385 total_memory_bytes: system_memory_available_bytes() as _,
386 total_cpu_cores: total_cpu_available() as _,
387 hostname: hostname(),
388 }),
389 };
390 let resp = self.inner.heartbeat(request).await?;
391 if let Some(status) = resp.status
392 && status.code() == risingwave_pb::common::status::Code::UnknownWorker
393 {
394 if !self.shutting_down.load(Relaxed) {
397 tracing::error!(message = status.message, "worker expired");
398 std::process::exit(1);
399 }
400 }
401 Ok(())
402 }
403
404 pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
405 let request = CreateDatabaseRequest { db: Some(db) };
406 let resp = self.inner.create_database(request).await?;
407 Ok(resp
409 .version
410 .ok_or_else(|| anyhow!("wait version not set"))?)
411 }
412
413 pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
414 let request = CreateSchemaRequest {
415 schema: Some(schema),
416 };
417 let resp = self.inner.create_schema(request).await?;
418 Ok(resp
420 .version
421 .ok_or_else(|| anyhow!("wait version not set"))?)
422 }
423
424 pub async fn create_materialized_view(
425 &self,
426 table: PbTable,
427 graph: StreamFragmentGraph,
428 dependencies: HashSet<ObjectId>,
429 resource_type: streaming_job_resource_type::ResourceType,
430 if_not_exists: bool,
431 ) -> Result<WaitVersion> {
432 let request = CreateMaterializedViewRequest {
433 materialized_view: Some(table),
434 fragment_graph: Some(graph),
435 resource_type: Some(PbStreamingJobResourceType {
436 resource_type: Some(resource_type),
437 }),
438 dependencies: dependencies.into_iter().collect(),
439 if_not_exists,
440 };
441 let resp = self.inner.create_materialized_view(request).await?;
442 Ok(resp
444 .version
445 .ok_or_else(|| anyhow!("wait version not set"))?)
446 }
447
448 pub async fn drop_materialized_view(
449 &self,
450 table_id: TableId,
451 cascade: bool,
452 ) -> Result<WaitVersion> {
453 let request = DropMaterializedViewRequest { table_id, cascade };
454
455 let resp = self.inner.drop_materialized_view(request).await?;
456 Ok(resp
457 .version
458 .ok_or_else(|| anyhow!("wait version not set"))?)
459 }
460
461 pub async fn create_source(
462 &self,
463 source: PbSource,
464 graph: Option<StreamFragmentGraph>,
465 if_not_exists: bool,
466 ) -> Result<WaitVersion> {
467 let request = CreateSourceRequest {
468 source: Some(source),
469 fragment_graph: graph,
470 if_not_exists,
471 };
472
473 let resp = self.inner.create_source(request).await?;
474 Ok(resp
475 .version
476 .ok_or_else(|| anyhow!("wait version not set"))?)
477 }
478
479 pub async fn create_sink(
480 &self,
481 sink: PbSink,
482 graph: StreamFragmentGraph,
483 dependencies: HashSet<ObjectId>,
484 if_not_exists: bool,
485 ) -> Result<WaitVersion> {
486 let request = CreateSinkRequest {
487 sink: Some(sink),
488 fragment_graph: Some(graph),
489 dependencies: dependencies.into_iter().collect(),
490 if_not_exists,
491 };
492
493 let resp = self.inner.create_sink(request).await?;
494 Ok(resp
495 .version
496 .ok_or_else(|| anyhow!("wait version not set"))?)
497 }
498
499 pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
500 let request = CreateSubscriptionRequest {
501 subscription: Some(subscription),
502 };
503
504 let resp = self.inner.create_subscription(request).await?;
505 Ok(resp
506 .version
507 .ok_or_else(|| anyhow!("wait version not set"))?)
508 }
509
510 pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
511 let request = CreateFunctionRequest {
512 function: Some(function),
513 };
514 let resp = self.inner.create_function(request).await?;
515 Ok(resp
516 .version
517 .ok_or_else(|| anyhow!("wait version not set"))?)
518 }
519
520 pub async fn create_table(
521 &self,
522 source: Option<PbSource>,
523 table: PbTable,
524 graph: StreamFragmentGraph,
525 job_type: PbTableJobType,
526 if_not_exists: bool,
527 dependencies: HashSet<ObjectId>,
528 ) -> Result<WaitVersion> {
529 let request = CreateTableRequest {
530 materialized_view: Some(table),
531 fragment_graph: Some(graph),
532 source,
533 job_type: job_type as _,
534 if_not_exists,
535 dependencies: dependencies.into_iter().collect(),
536 };
537 let resp = self.inner.create_table(request).await?;
538 Ok(resp
540 .version
541 .ok_or_else(|| anyhow!("wait version not set"))?)
542 }
543
544 pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
545 let request = CommentOnRequest {
546 comment: Some(comment),
547 };
548 let resp = self.inner.comment_on(request).await?;
549 Ok(resp
550 .version
551 .ok_or_else(|| anyhow!("wait version not set"))?)
552 }
553
554 pub async fn alter_name(
555 &self,
556 object: alter_name_request::Object,
557 name: &str,
558 ) -> Result<WaitVersion> {
559 let request = AlterNameRequest {
560 object: Some(object),
561 new_name: name.to_owned(),
562 };
563 let resp = self.inner.alter_name(request).await?;
564 Ok(resp
565 .version
566 .ok_or_else(|| anyhow!("wait version not set"))?)
567 }
568
569 pub async fn alter_owner(&self, object: Object, owner_id: UserId) -> Result<WaitVersion> {
570 let request = AlterOwnerRequest {
571 object: Some(object),
572 owner_id,
573 };
574 let resp = self.inner.alter_owner(request).await?;
575 Ok(resp
576 .version
577 .ok_or_else(|| anyhow!("wait version not set"))?)
578 }
579
580 pub async fn alter_subscription_retention(
581 &self,
582 subscription_id: SubscriptionId,
583 retention_seconds: u64,
584 definition: String,
585 ) -> Result<WaitVersion> {
586 let request = AlterSubscriptionRetentionRequest {
587 subscription_id,
588 retention_seconds,
589 definition,
590 };
591 let resp = self.inner.alter_subscription_retention(request).await?;
592 Ok(resp
593 .version
594 .ok_or_else(|| anyhow!("wait version not set"))?)
595 }
596
597 pub async fn alter_database_param(
598 &self,
599 database_id: DatabaseId,
600 param: AlterDatabaseParam,
601 ) -> Result<WaitVersion> {
602 let request = match param {
603 AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms) => {
604 let barrier_interval_ms = OptionalUint32 {
605 value: barrier_interval_ms,
606 };
607 AlterDatabaseParamRequest {
608 database_id,
609 param: Some(alter_database_param_request::Param::BarrierIntervalMs(
610 barrier_interval_ms,
611 )),
612 }
613 }
614 AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency) => {
615 let checkpoint_frequency = OptionalUint64 {
616 value: checkpoint_frequency,
617 };
618 AlterDatabaseParamRequest {
619 database_id,
620 param: Some(alter_database_param_request::Param::CheckpointFrequency(
621 checkpoint_frequency,
622 )),
623 }
624 }
625 };
626 let resp = self.inner.alter_database_param(request).await?;
627 Ok(resp
628 .version
629 .ok_or_else(|| anyhow!("wait version not set"))?)
630 }
631
632 pub async fn alter_set_schema(
633 &self,
634 object: alter_set_schema_request::Object,
635 new_schema_id: SchemaId,
636 ) -> Result<WaitVersion> {
637 let request = AlterSetSchemaRequest {
638 new_schema_id,
639 object: Some(object),
640 };
641 let resp = self.inner.alter_set_schema(request).await?;
642 Ok(resp
643 .version
644 .ok_or_else(|| anyhow!("wait version not set"))?)
645 }
646
647 pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
648 let request = AlterSourceRequest {
649 source: Some(source),
650 };
651 let resp = self.inner.alter_source(request).await?;
652 Ok(resp
653 .version
654 .ok_or_else(|| anyhow!("wait version not set"))?)
655 }
656
657 pub async fn alter_parallelism(
658 &self,
659 job_id: JobId,
660 parallelism: PbTableParallelism,
661 deferred: bool,
662 ) -> Result<()> {
663 let request = AlterParallelismRequest {
664 table_id: job_id,
665 parallelism: Some(parallelism),
666 deferred,
667 };
668
669 self.inner.alter_parallelism(request).await?;
670 Ok(())
671 }
672
673 pub async fn alter_backfill_parallelism(
674 &self,
675 job_id: JobId,
676 parallelism: Option<PbTableParallelism>,
677 deferred: bool,
678 ) -> Result<()> {
679 let request = AlterBackfillParallelismRequest {
680 table_id: job_id,
681 parallelism,
682 deferred,
683 };
684
685 self.inner.alter_backfill_parallelism(request).await?;
686 Ok(())
687 }
688
689 pub async fn alter_streaming_job_config(
690 &self,
691 job_id: JobId,
692 entries_to_add: HashMap<String, String>,
693 keys_to_remove: Vec<String>,
694 ) -> Result<()> {
695 let request = AlterStreamingJobConfigRequest {
696 job_id,
697 entries_to_add,
698 keys_to_remove,
699 };
700
701 self.inner.alter_streaming_job_config(request).await?;
702 Ok(())
703 }
704
705 pub async fn alter_fragment_parallelism(
706 &self,
707 fragment_ids: Vec<FragmentId>,
708 parallelism: Option<PbTableParallelism>,
709 ) -> Result<()> {
710 let request = AlterFragmentParallelismRequest {
711 fragment_ids,
712 parallelism,
713 };
714
715 self.inner.alter_fragment_parallelism(request).await?;
716 Ok(())
717 }
718
719 pub async fn alter_cdc_table_backfill_parallelism(
720 &self,
721 table_id: JobId,
722 parallelism: PbTableParallelism,
723 ) -> Result<()> {
724 let request = AlterCdcTableBackfillParallelismRequest {
725 table_id,
726 parallelism: Some(parallelism),
727 };
728 self.inner
729 .alter_cdc_table_backfill_parallelism(request)
730 .await?;
731 Ok(())
732 }
733
734 pub async fn alter_resource_group(
735 &self,
736 table_id: TableId,
737 resource_group: Option<String>,
738 deferred: bool,
739 ) -> Result<()> {
740 let request = AlterResourceGroupRequest {
741 table_id,
742 resource_group,
743 deferred,
744 };
745
746 self.inner.alter_resource_group(request).await?;
747 Ok(())
748 }
749
750 pub async fn alter_swap_rename(
751 &self,
752 object: alter_swap_rename_request::Object,
753 ) -> Result<WaitVersion> {
754 let request = AlterSwapRenameRequest {
755 object: Some(object),
756 };
757 let resp = self.inner.alter_swap_rename(request).await?;
758 Ok(resp
759 .version
760 .ok_or_else(|| anyhow!("wait version not set"))?)
761 }
762
763 pub async fn alter_secret(
764 &self,
765 secret_id: SecretId,
766 secret_name: String,
767 database_id: DatabaseId,
768 schema_id: SchemaId,
769 owner_id: UserId,
770 value: Vec<u8>,
771 ) -> Result<WaitVersion> {
772 let request = AlterSecretRequest {
773 secret_id,
774 name: secret_name,
775 database_id,
776 schema_id,
777 owner_id,
778 value,
779 };
780 let resp = self.inner.alter_secret(request).await?;
781 Ok(resp
782 .version
783 .ok_or_else(|| anyhow!("wait version not set"))?)
784 }
785
786 pub async fn replace_job(
787 &self,
788 graph: StreamFragmentGraph,
789 replace_job: ReplaceJob,
790 ) -> Result<WaitVersion> {
791 let request = ReplaceJobPlanRequest {
792 plan: Some(ReplaceJobPlan {
793 fragment_graph: Some(graph),
794 replace_job: Some(replace_job),
795 }),
796 };
797 let resp = self.inner.replace_job_plan(request).await?;
798 Ok(resp
800 .version
801 .ok_or_else(|| anyhow!("wait version not set"))?)
802 }
803
804 pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
805 let request = AutoSchemaChangeRequest {
806 schema_change: Some(schema_change),
807 };
808 let _ = self.inner.auto_schema_change(request).await?;
809 Ok(())
810 }
811
812 pub async fn create_view(
813 &self,
814 view: PbView,
815 dependencies: HashSet<ObjectId>,
816 ) -> Result<WaitVersion> {
817 let request = CreateViewRequest {
818 view: Some(view),
819 dependencies: dependencies.into_iter().collect(),
820 };
821 let resp = self.inner.create_view(request).await?;
822 Ok(resp
824 .version
825 .ok_or_else(|| anyhow!("wait version not set"))?)
826 }
827
828 pub async fn create_index(
829 &self,
830 index: PbIndex,
831 table: PbTable,
832 graph: StreamFragmentGraph,
833 if_not_exists: bool,
834 ) -> Result<WaitVersion> {
835 let request = CreateIndexRequest {
836 index: Some(index),
837 index_table: Some(table),
838 fragment_graph: Some(graph),
839 if_not_exists,
840 };
841 let resp = self.inner.create_index(request).await?;
842 Ok(resp
844 .version
845 .ok_or_else(|| anyhow!("wait version not set"))?)
846 }
847
848 pub async fn drop_table(
849 &self,
850 source_id: Option<SourceId>,
851 table_id: TableId,
852 cascade: bool,
853 ) -> Result<WaitVersion> {
854 let request = DropTableRequest {
855 source_id: source_id.map(risingwave_pb::ddl_service::drop_table_request::SourceId::Id),
856 table_id,
857 cascade,
858 };
859
860 let resp = self.inner.drop_table(request).await?;
861 Ok(resp
862 .version
863 .ok_or_else(|| anyhow!("wait version not set"))?)
864 }
865
866 pub async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
867 let request = CompactIcebergTableRequest { sink_id };
868 let resp = self.inner.compact_iceberg_table(request).await?;
869 Ok(resp.task_id)
870 }
871
872 pub async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
873 let request = ExpireIcebergTableSnapshotsRequest { sink_id };
874 let _resp = self.inner.expire_iceberg_table_snapshots(request).await?;
875 Ok(())
876 }
877
878 pub async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<WaitVersion> {
879 let request = DropViewRequest { view_id, cascade };
880 let resp = self.inner.drop_view(request).await?;
881 Ok(resp
882 .version
883 .ok_or_else(|| anyhow!("wait version not set"))?)
884 }
885
886 pub async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<WaitVersion> {
887 let request = DropSourceRequest { source_id, cascade };
888 let resp = self.inner.drop_source(request).await?;
889 Ok(resp
890 .version
891 .ok_or_else(|| anyhow!("wait version not set"))?)
892 }
893
894 pub async fn reset_source(&self, source_id: SourceId) -> Result<WaitVersion> {
895 let request = ResetSourceRequest { source_id };
896 let resp = self.inner.reset_source(request).await?;
897 Ok(resp
898 .version
899 .ok_or_else(|| anyhow!("wait version not set"))?)
900 }
901
902 pub async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<WaitVersion> {
903 let request = DropSinkRequest { sink_id, cascade };
904 let resp = self.inner.drop_sink(request).await?;
905 Ok(resp
906 .version
907 .ok_or_else(|| anyhow!("wait version not set"))?)
908 }
909
910 pub async fn drop_subscription(
911 &self,
912 subscription_id: SubscriptionId,
913 cascade: bool,
914 ) -> Result<WaitVersion> {
915 let request = DropSubscriptionRequest {
916 subscription_id,
917 cascade,
918 };
919 let resp = self.inner.drop_subscription(request).await?;
920 Ok(resp
921 .version
922 .ok_or_else(|| anyhow!("wait version not set"))?)
923 }
924
925 pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
926 let request = DropIndexRequest { index_id, cascade };
927 let resp = self.inner.drop_index(request).await?;
928 Ok(resp
929 .version
930 .ok_or_else(|| anyhow!("wait version not set"))?)
931 }
932
933 pub async fn drop_function(
934 &self,
935 function_id: FunctionId,
936 cascade: bool,
937 ) -> Result<WaitVersion> {
938 let request = DropFunctionRequest {
939 function_id,
940 cascade,
941 };
942 let resp = self.inner.drop_function(request).await?;
943 Ok(resp
944 .version
945 .ok_or_else(|| anyhow!("wait version not set"))?)
946 }
947
948 pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
949 let request = DropDatabaseRequest { database_id };
950 let resp = self.inner.drop_database(request).await?;
951 Ok(resp
952 .version
953 .ok_or_else(|| anyhow!("wait version not set"))?)
954 }
955
956 pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
957 let request = DropSchemaRequest { schema_id, cascade };
958 let resp = self.inner.drop_schema(request).await?;
959 Ok(resp
960 .version
961 .ok_or_else(|| anyhow!("wait version not set"))?)
962 }
963
964 pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
966 let request = CreateUserRequest { user: Some(user) };
967 let resp = self.inner.create_user(request).await?;
968 Ok(resp.version)
969 }
970
971 pub async fn drop_user(&self, user_id: UserId) -> Result<u64> {
972 let request = DropUserRequest { user_id };
973 let resp = self.inner.drop_user(request).await?;
974 Ok(resp.version)
975 }
976
977 pub async fn update_user(
978 &self,
979 user: UserInfo,
980 update_fields: Vec<UpdateField>,
981 ) -> Result<u64> {
982 let request = UpdateUserRequest {
983 user: Some(user),
984 update_fields: update_fields
985 .into_iter()
986 .map(|field| field as i32)
987 .collect::<Vec<_>>(),
988 };
989 let resp = self.inner.update_user(request).await?;
990 Ok(resp.version)
991 }
992
993 pub async fn grant_privilege(
994 &self,
995 user_ids: Vec<UserId>,
996 privileges: Vec<GrantPrivilege>,
997 with_grant_option: bool,
998 granted_by: UserId,
999 ) -> Result<u64> {
1000 let request = GrantPrivilegeRequest {
1001 user_ids,
1002 privileges,
1003 with_grant_option,
1004 granted_by,
1005 };
1006 let resp = self.inner.grant_privilege(request).await?;
1007 Ok(resp.version)
1008 }
1009
1010 pub async fn revoke_privilege(
1011 &self,
1012 user_ids: Vec<UserId>,
1013 privileges: Vec<GrantPrivilege>,
1014 granted_by: UserId,
1015 revoke_by: UserId,
1016 revoke_grant_option: bool,
1017 cascade: bool,
1018 ) -> Result<u64> {
1019 let request = RevokePrivilegeRequest {
1020 user_ids,
1021 privileges,
1022 granted_by,
1023 revoke_by,
1024 revoke_grant_option,
1025 cascade,
1026 };
1027 let resp = self.inner.revoke_privilege(request).await?;
1028 Ok(resp.version)
1029 }
1030
1031 pub async fn alter_default_privilege(
1032 &self,
1033 user_ids: Vec<UserId>,
1034 database_id: DatabaseId,
1035 schema_ids: Vec<SchemaId>,
1036 operation: AlterDefaultPrivilegeOperation,
1037 granted_by: UserId,
1038 ) -> Result<()> {
1039 let request = AlterDefaultPrivilegeRequest {
1040 user_ids,
1041 database_id,
1042 schema_ids,
1043 operation: Some(operation),
1044 granted_by,
1045 };
1046 self.inner.alter_default_privilege(request).await?;
1047 Ok(())
1048 }
1049
1050 pub async fn unregister(&self) -> Result<()> {
1052 let request = DeleteWorkerNodeRequest {
1053 host: Some(self.host_addr.to_protobuf()),
1054 };
1055 self.inner.delete_worker_node(request).await?;
1056 self.shutting_down.store(true, Relaxed);
1057 Ok(())
1058 }
1059
1060 pub async fn try_unregister(&self) {
1062 match self.unregister().await {
1063 Ok(_) => {
1064 tracing::info!(
1065 worker_id = %self.worker_id(),
1066 "successfully unregistered from meta service",
1067 )
1068 }
1069 Err(e) => {
1070 tracing::warn!(
1071 error = %e.as_report(),
1072 worker_id = %self.worker_id(),
1073 "failed to unregister from meta service",
1074 );
1075 }
1076 }
1077 }
1078
1079 pub async fn update_schedulability(
1080 &self,
1081 worker_ids: &[WorkerId],
1082 schedulability: Schedulability,
1083 ) -> Result<UpdateWorkerNodeSchedulabilityResponse> {
1084 let request = UpdateWorkerNodeSchedulabilityRequest {
1085 worker_ids: worker_ids.to_vec(),
1086 schedulability: schedulability.into(),
1087 };
1088 let resp = self
1089 .inner
1090 .update_worker_node_schedulability(request)
1091 .await?;
1092 Ok(resp)
1093 }
1094
1095 pub async fn list_worker_nodes(
1096 &self,
1097 worker_type: Option<WorkerType>,
1098 ) -> Result<Vec<WorkerNode>> {
1099 let request = ListAllNodesRequest {
1100 worker_type: worker_type.map(Into::into),
1101 include_starting_nodes: true,
1102 };
1103 let resp = self.inner.list_all_nodes(request).await?;
1104 Ok(resp.nodes)
1105 }
1106
1107 pub fn start_heartbeat_loop(
1109 meta_client: MetaClient,
1110 min_interval: Duration,
1111 ) -> (JoinHandle<()>, Sender<()>) {
1112 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1113 let join_handle = tokio::spawn(async move {
1114 let mut min_interval_ticker = tokio::time::interval(min_interval);
1115 loop {
1116 tokio::select! {
1117 biased;
1118 _ = &mut shutdown_rx => {
1120 tracing::info!("Heartbeat loop is stopped");
1121 return;
1122 }
1123 _ = min_interval_ticker.tick() => {},
1125 }
1126 tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
1127 match tokio::time::timeout(
1128 min_interval * 3,
1130 meta_client.send_heartbeat(),
1131 )
1132 .await
1133 {
1134 Ok(Ok(_)) => {}
1135 Ok(Err(err)) => {
1136 tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
1137 }
1138 Err(_) => {
1139 tracing::warn!("Failed to send_heartbeat: timeout");
1140 }
1141 }
1142 }
1143 });
1144 (join_handle, shutdown_tx)
1145 }
1146
1147 pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
1148 let request = RisectlListStateTablesRequest {};
1149 let resp = self.inner.risectl_list_state_tables(request).await?;
1150 Ok(resp.tables)
1151 }
1152
1153 pub async fn risectl_resume_backfill(
1154 &self,
1155 request: RisectlResumeBackfillRequest,
1156 ) -> Result<()> {
1157 self.inner.risectl_resume_backfill(request).await?;
1158 Ok(())
1159 }
1160
1161 pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
1162 let request = FlushRequest { database_id };
1163 let resp = self.inner.flush(request).await?;
1164 Ok(resp.hummock_version_id)
1165 }
1166
1167 pub async fn wait(&self) -> Result<WaitVersion> {
1168 let request = WaitRequest {};
1169 let resp = self.inner.wait(request).await?;
1170 Ok(resp
1171 .version
1172 .ok_or_else(|| anyhow!("wait version not set"))?)
1173 }
1174
1175 pub async fn recover(&self) -> Result<()> {
1176 let request = RecoverRequest {};
1177 self.inner.recover(request).await?;
1178 Ok(())
1179 }
1180
1181 pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
1182 let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
1183 let resp = self.inner.cancel_creating_jobs(request).await?;
1184 Ok(resp.canceled_jobs)
1185 }
1186
1187 pub async fn list_table_fragments(
1188 &self,
1189 job_ids: &[JobId],
1190 ) -> Result<HashMap<JobId, TableFragmentInfo>> {
1191 let request = ListTableFragmentsRequest {
1192 table_ids: job_ids.to_vec(),
1193 };
1194 let resp = self.inner.list_table_fragments(request).await?;
1195 Ok(resp.table_fragments)
1196 }
1197
1198 pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1199 let resp = self
1200 .inner
1201 .list_streaming_job_states(ListStreamingJobStatesRequest {})
1202 .await?;
1203 Ok(resp.states)
1204 }
1205
1206 pub async fn list_fragment_distributions(
1207 &self,
1208 include_node: bool,
1209 ) -> Result<Vec<FragmentDistribution>> {
1210 let resp = self
1211 .inner
1212 .list_fragment_distribution(ListFragmentDistributionRequest {
1213 include_node: Some(include_node),
1214 })
1215 .await?;
1216 Ok(resp.distributions)
1217 }
1218
1219 pub async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
1220 let resp = self
1221 .inner
1222 .list_creating_fragment_distribution(ListCreatingFragmentDistributionRequest {
1223 include_node: Some(true),
1224 })
1225 .await?;
1226 Ok(resp.distributions)
1227 }
1228
1229 pub async fn get_fragment_by_id(
1230 &self,
1231 fragment_id: FragmentId,
1232 ) -> Result<Option<FragmentDistribution>> {
1233 let resp = self
1234 .inner
1235 .get_fragment_by_id(GetFragmentByIdRequest { fragment_id })
1236 .await?;
1237 Ok(resp.distribution)
1238 }
1239
1240 pub async fn get_fragment_vnodes(
1241 &self,
1242 fragment_id: FragmentId,
1243 ) -> Result<Vec<(ActorId, Vec<u32>)>> {
1244 let resp = self
1245 .inner
1246 .get_fragment_vnodes(GetFragmentVnodesRequest { fragment_id })
1247 .await?;
1248 Ok(resp
1249 .actor_vnodes
1250 .into_iter()
1251 .map(|actor| (actor.actor_id, actor.vnode_indices))
1252 .collect())
1253 }
1254
1255 pub async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>> {
1256 let resp = self
1257 .inner
1258 .get_actor_vnodes(GetActorVnodesRequest { actor_id })
1259 .await?;
1260 Ok(resp.vnode_indices)
1261 }
1262
1263 pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1264 let resp = self
1265 .inner
1266 .list_actor_states(ListActorStatesRequest {})
1267 .await?;
1268 Ok(resp.states)
1269 }
1270
1271 pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1272 let resp = self
1273 .inner
1274 .list_actor_splits(ListActorSplitsRequest {})
1275 .await?;
1276
1277 Ok(resp.actor_splits)
1278 }
1279
1280 pub async fn pause(&self) -> Result<PauseResponse> {
1281 let request = PauseRequest {};
1282 let resp = self.inner.pause(request).await?;
1283 Ok(resp)
1284 }
1285
1286 pub async fn resume(&self) -> Result<ResumeResponse> {
1287 let request = ResumeRequest {};
1288 let resp = self.inner.resume(request).await?;
1289 Ok(resp)
1290 }
1291
1292 pub async fn apply_throttle(
1293 &self,
1294 throttle_target: PbThrottleTarget,
1295 throttle_type: risingwave_pb::common::PbThrottleType,
1296 id: u32,
1297 rate: Option<u32>,
1298 ) -> Result<ApplyThrottleResponse> {
1299 let request = ApplyThrottleRequest {
1300 throttle_target: throttle_target as i32,
1301 throttle_type: throttle_type as i32,
1302 id,
1303 rate,
1304 };
1305 let resp = self.inner.apply_throttle(request).await?;
1306 Ok(resp)
1307 }
1308
1309 pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1310 let resp = self
1311 .inner
1312 .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1313 .await?;
1314 Ok(resp.get_status().unwrap())
1315 }
1316
1317 pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1318 let request = GetClusterInfoRequest {};
1319 let resp = self.inner.get_cluster_info(request).await?;
1320 Ok(resp)
1321 }
1322
1323 pub async fn reschedule(
1324 &self,
1325 worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1326 revision: u64,
1327 resolve_no_shuffle_upstream: bool,
1328 ) -> Result<(bool, u64)> {
1329 let request = RescheduleRequest {
1330 revision,
1331 resolve_no_shuffle_upstream,
1332 worker_reschedules,
1333 };
1334 let resp = self.inner.reschedule(request).await?;
1335 Ok((resp.success, resp.revision))
1336 }
1337
1338 pub async fn risectl_get_pinned_versions_summary(
1339 &self,
1340 ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1341 let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1342 self.inner
1343 .rise_ctl_get_pinned_versions_summary(request)
1344 .await
1345 }
1346
1347 pub async fn risectl_get_checkpoint_hummock_version(
1348 &self,
1349 ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1350 let request = RiseCtlGetCheckpointVersionRequest {};
1351 self.inner.rise_ctl_get_checkpoint_version(request).await
1352 }
1353
1354 pub async fn risectl_pause_hummock_version_checkpoint(
1355 &self,
1356 ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1357 let request = RiseCtlPauseVersionCheckpointRequest {};
1358 self.inner.rise_ctl_pause_version_checkpoint(request).await
1359 }
1360
1361 pub async fn risectl_resume_hummock_version_checkpoint(
1362 &self,
1363 ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1364 let request = RiseCtlResumeVersionCheckpointRequest {};
1365 self.inner.rise_ctl_resume_version_checkpoint(request).await
1366 }
1367
1368 pub async fn init_metadata_for_replay(
1369 &self,
1370 tables: Vec<PbTable>,
1371 compaction_groups: Vec<CompactionGroupInfo>,
1372 ) -> Result<()> {
1373 let req = InitMetadataForReplayRequest {
1374 tables,
1375 compaction_groups,
1376 };
1377 let _resp = self.inner.init_metadata_for_replay(req).await?;
1378 Ok(())
1379 }
1380
1381 pub async fn replay_version_delta(
1382 &self,
1383 version_delta: HummockVersionDelta,
1384 ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1385 let req = ReplayVersionDeltaRequest {
1386 version_delta: Some(version_delta.into()),
1387 };
1388 let resp = self.inner.replay_version_delta(req).await?;
1389 Ok((
1390 HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1391 resp.modified_compaction_groups,
1392 ))
1393 }
1394
1395 pub async fn list_version_deltas(
1396 &self,
1397 start_id: HummockVersionId,
1398 num_limit: u32,
1399 committed_epoch_limit: HummockEpoch,
1400 ) -> Result<Vec<HummockVersionDelta>> {
1401 let req = ListVersionDeltasRequest {
1402 start_id,
1403 num_limit,
1404 committed_epoch_limit,
1405 };
1406 Ok(self
1407 .inner
1408 .list_version_deltas(req)
1409 .await?
1410 .version_deltas
1411 .unwrap()
1412 .version_deltas
1413 .iter()
1414 .map(HummockVersionDelta::from_rpc_protobuf)
1415 .collect())
1416 }
1417
1418 pub async fn trigger_compaction_deterministic(
1419 &self,
1420 version_id: HummockVersionId,
1421 compaction_groups: Vec<CompactionGroupId>,
1422 ) -> Result<()> {
1423 let req = TriggerCompactionDeterministicRequest {
1424 version_id,
1425 compaction_groups,
1426 };
1427 self.inner.trigger_compaction_deterministic(req).await?;
1428 Ok(())
1429 }
1430
1431 pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1432 let req = DisableCommitEpochRequest {};
1433 Ok(HummockVersion::from_rpc_protobuf(
1434 &self
1435 .inner
1436 .disable_commit_epoch(req)
1437 .await?
1438 .current_version
1439 .unwrap(),
1440 ))
1441 }
1442
1443 pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1444 let req = GetAssignedCompactTaskNumRequest {};
1445 let resp = self.inner.get_assigned_compact_task_num(req).await?;
1446 Ok(resp.num_tasks as usize)
1447 }
1448
1449 pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1450 let req = RiseCtlListCompactionGroupRequest {};
1451 let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1452 Ok(resp.compaction_groups)
1453 }
1454
1455 pub async fn risectl_update_compaction_config(
1456 &self,
1457 compaction_groups: &[CompactionGroupId],
1458 configs: &[MutableConfig],
1459 ) -> Result<()> {
1460 let req = RiseCtlUpdateCompactionConfigRequest {
1461 compaction_group_ids: compaction_groups.to_vec(),
1462 configs: configs
1463 .iter()
1464 .map(
1465 |c| rise_ctl_update_compaction_config_request::MutableConfig {
1466 mutable_config: Some(c.clone()),
1467 },
1468 )
1469 .collect(),
1470 };
1471 let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1472 Ok(())
1473 }
1474
1475 pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1476 let req = BackupMetaRequest { remarks };
1477 let resp = self.inner.backup_meta(req).await?;
1478 Ok(resp.job_id)
1479 }
1480
1481 pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1482 let req = GetBackupJobStatusRequest { job_id };
1483 let resp = self.inner.get_backup_job_status(req).await?;
1484 Ok((resp.job_status(), resp.message))
1485 }
1486
1487 pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1488 let req = DeleteMetaSnapshotRequest {
1489 snapshot_ids: snapshot_ids.to_vec(),
1490 };
1491 let _resp = self.inner.delete_meta_snapshot(req).await?;
1492 Ok(())
1493 }
1494
1495 pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1496 let req = GetMetaSnapshotManifestRequest {};
1497 let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1498 Ok(resp.manifest.expect("should exist"))
1499 }
1500
1501 pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1502 let req = GetTelemetryInfoRequest {};
1503 let resp = self.inner.get_telemetry_info(req).await?;
1504 Ok(resp)
1505 }
1506
1507 pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1508 let req = GetMetaStoreInfoRequest {};
1509 let resp = self.inner.get_meta_store_info(req).await?;
1510 Ok(resp.meta_store_endpoint)
1511 }
1512
1513 pub async fn alter_sink_props(
1514 &self,
1515 sink_id: SinkId,
1516 changed_props: BTreeMap<String, String>,
1517 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1518 connector_conn_ref: Option<ConnectionId>,
1519 ) -> Result<()> {
1520 let req = AlterConnectorPropsRequest {
1521 object_id: sink_id.as_raw_id(),
1522 changed_props: changed_props.into_iter().collect(),
1523 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1524 connector_conn_ref,
1525 object_type: AlterConnectorPropsObject::Sink as i32,
1526 extra_options: None,
1527 };
1528 let _resp = self.inner.alter_connector_props(req).await?;
1529 Ok(())
1530 }
1531
1532 pub async fn alter_iceberg_table_props(
1533 &self,
1534 table_id: TableId,
1535 sink_id: SinkId,
1536 source_id: SourceId,
1537 changed_props: BTreeMap<String, String>,
1538 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1539 connector_conn_ref: Option<ConnectionId>,
1540 ) -> Result<()> {
1541 let req = AlterConnectorPropsRequest {
1542 object_id: table_id.as_raw_id(),
1543 changed_props: changed_props.into_iter().collect(),
1544 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1545 connector_conn_ref,
1546 object_type: AlterConnectorPropsObject::IcebergTable as i32,
1547 extra_options: Some(ExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds {
1548 sink_id,
1549 source_id,
1550 })),
1551 };
1552 let _resp = self.inner.alter_connector_props(req).await?;
1553 Ok(())
1554 }
1555
1556 pub async fn alter_source_connector_props(
1557 &self,
1558 source_id: SourceId,
1559 changed_props: BTreeMap<String, String>,
1560 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1561 connector_conn_ref: Option<ConnectionId>,
1562 ) -> Result<()> {
1563 let req = AlterConnectorPropsRequest {
1564 object_id: source_id.as_raw_id(),
1565 changed_props: changed_props.into_iter().collect(),
1566 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1567 connector_conn_ref,
1568 object_type: AlterConnectorPropsObject::Source as i32,
1569 extra_options: None,
1570 };
1571 let _resp = self.inner.alter_connector_props(req).await?;
1572 Ok(())
1573 }
1574
1575 pub async fn alter_connection_connector_props(
1576 &self,
1577 connection_id: u32,
1578 changed_props: BTreeMap<String, String>,
1579 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1580 ) -> Result<()> {
1581 let req = AlterConnectorPropsRequest {
1582 object_id: connection_id,
1583 changed_props: changed_props.into_iter().collect(),
1584 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1585 connector_conn_ref: None, object_type: AlterConnectorPropsObject::Connection as i32,
1587 extra_options: None,
1588 };
1589 let _resp = self.inner.alter_connector_props(req).await?;
1590 Ok(())
1591 }
1592
1593 pub async fn alter_source_properties_safe(
1596 &self,
1597 source_id: SourceId,
1598 changed_props: BTreeMap<String, String>,
1599 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1600 reset_splits: bool,
1601 ) -> Result<()> {
1602 let req = AlterSourcePropertiesSafeRequest {
1603 source_id: source_id.as_raw_id(),
1604 changed_props: changed_props.into_iter().collect(),
1605 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1606 options: Some(PropertyUpdateOptions { reset_splits }),
1607 };
1608 let _resp = self.inner.alter_source_properties_safe(req).await?;
1609 Ok(())
1610 }
1611
1612 pub async fn reset_source_splits(&self, source_id: SourceId) -> Result<()> {
1615 let req = ResetSourceSplitsRequest {
1616 source_id: source_id.as_raw_id(),
1617 };
1618 let _resp = self.inner.reset_source_splits(req).await?;
1619 Ok(())
1620 }
1621
1622 pub async fn inject_source_offsets(
1625 &self,
1626 source_id: SourceId,
1627 split_offsets: HashMap<String, String>,
1628 ) -> Result<Vec<String>> {
1629 let req = InjectSourceOffsetsRequest {
1630 source_id: source_id.as_raw_id(),
1631 split_offsets,
1632 };
1633 let resp = self.inner.inject_source_offsets(req).await?;
1634 Ok(resp.applied_split_ids)
1635 }
1636
1637 pub async fn set_system_param(
1638 &self,
1639 param: String,
1640 value: Option<String>,
1641 ) -> Result<Option<SystemParamsReader>> {
1642 let req = SetSystemParamRequest { param, value };
1643 let resp = self.inner.set_system_param(req).await?;
1644 Ok(resp.params.map(SystemParamsReader::from))
1645 }
1646
1647 pub async fn get_session_params(&self) -> Result<String> {
1648 let req = GetSessionParamsRequest {};
1649 let resp = self.inner.get_session_params(req).await?;
1650 Ok(resp.params)
1651 }
1652
1653 pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1654 let req = SetSessionParamRequest { param, value };
1655 let resp = self.inner.set_session_param(req).await?;
1656 Ok(resp.param)
1657 }
1658
1659 pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1660 let req = GetDdlProgressRequest {};
1661 let resp = self.inner.get_ddl_progress(req).await?;
1662 Ok(resp.ddl_progress)
1663 }
1664
1665 pub async fn split_compaction_group(
1666 &self,
1667 group_id: CompactionGroupId,
1668 table_ids_to_new_group: &[TableId],
1669 partition_vnode_count: u32,
1670 ) -> Result<CompactionGroupId> {
1671 let req = SplitCompactionGroupRequest {
1672 group_id,
1673 table_ids: table_ids_to_new_group.to_vec(),
1674 partition_vnode_count,
1675 };
1676 let resp = self.inner.split_compaction_group(req).await?;
1677 Ok(resp.new_group_id)
1678 }
1679
1680 pub async fn get_tables(
1681 &self,
1682 table_ids: Vec<TableId>,
1683 include_dropped_tables: bool,
1684 ) -> Result<HashMap<TableId, Table>> {
1685 let req = GetTablesRequest {
1686 table_ids,
1687 include_dropped_tables,
1688 };
1689 let resp = self.inner.get_tables(req).await?;
1690 Ok(resp.tables)
1691 }
1692
1693 pub async fn list_serving_vnode_mappings(
1694 &self,
1695 ) -> Result<HashMap<FragmentId, (JobId, WorkerSlotMapping)>> {
1696 let req = GetServingVnodeMappingsRequest {};
1697 let resp = self.inner.get_serving_vnode_mappings(req).await?;
1698 let mappings = resp
1699 .worker_slot_mappings
1700 .into_iter()
1701 .map(|p| {
1702 (
1703 p.fragment_id,
1704 (
1705 resp.fragment_to_table
1706 .get(&p.fragment_id)
1707 .cloned()
1708 .unwrap_or_default(),
1709 WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1710 ),
1711 )
1712 })
1713 .collect();
1714 Ok(mappings)
1715 }
1716
1717 pub async fn risectl_list_compaction_status(
1718 &self,
1719 ) -> Result<(
1720 Vec<CompactStatus>,
1721 Vec<CompactTaskAssignment>,
1722 Vec<CompactTaskProgress>,
1723 )> {
1724 let req = RiseCtlListCompactionStatusRequest {};
1725 let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1726 Ok((
1727 resp.compaction_statuses,
1728 resp.task_assignment,
1729 resp.task_progress,
1730 ))
1731 }
1732
1733 pub async fn get_compaction_score(
1734 &self,
1735 compaction_group_id: CompactionGroupId,
1736 ) -> Result<Vec<PickerInfo>> {
1737 let req = GetCompactionScoreRequest {
1738 compaction_group_id,
1739 };
1740 let resp = self.inner.get_compaction_score(req).await?;
1741 Ok(resp.scores)
1742 }
1743
1744 pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1745 let req = RiseCtlRebuildTableStatsRequest {};
1746 let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1747 Ok(())
1748 }
1749
1750 pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1751 let req = ListBranchedObjectRequest {};
1752 let resp = self.inner.list_branched_object(req).await?;
1753 Ok(resp.branched_objects)
1754 }
1755
1756 pub async fn list_active_write_limit(&self) -> Result<HashMap<CompactionGroupId, WriteLimit>> {
1757 let req = ListActiveWriteLimitRequest {};
1758 let resp = self.inner.list_active_write_limit(req).await?;
1759 Ok(resp.write_limits)
1760 }
1761
1762 pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1763 let req = ListHummockMetaConfigRequest {};
1764 let resp = self.inner.list_hummock_meta_config(req).await?;
1765 Ok(resp.configs)
1766 }
1767
1768 pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1769 let _resp = self
1770 .inner
1771 .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1772 .await?;
1773
1774 Ok(())
1775 }
1776
1777 pub async fn rw_cloud_validate_source(
1778 &self,
1779 source_type: SourceType,
1780 source_config: HashMap<String, String>,
1781 ) -> Result<RwCloudValidateSourceResponse> {
1782 let req = RwCloudValidateSourceRequest {
1783 source_type: source_type.into(),
1784 source_config,
1785 };
1786 let resp = self.inner.rw_cloud_validate_source(req).await?;
1787 Ok(resp)
1788 }
1789
1790 pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1791 self.inner.core.read().await.sink_coordinate_client.clone()
1792 }
1793
1794 pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1795 let req = ListCompactTaskAssignmentRequest {};
1796 let resp = self.inner.list_compact_task_assignment(req).await?;
1797 Ok(resp.task_assignment)
1798 }
1799
1800 pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1801 let req = ListEventLogRequest::default();
1802 let resp = self.inner.list_event_log(req).await?;
1803 Ok(resp.event_logs)
1804 }
1805
1806 pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1807 let req = ListCompactTaskProgressRequest {};
1808 let resp = self.inner.list_compact_task_progress(req).await?;
1809 Ok(resp.task_progress)
1810 }
1811
1812 #[cfg(madsim)]
1813 pub fn try_add_panic_event_blocking(
1814 &self,
1815 panic_info: impl Display,
1816 timeout_millis: Option<u64>,
1817 ) {
1818 }
1819
1820 #[cfg(not(madsim))]
1822 pub fn try_add_panic_event_blocking(
1823 &self,
1824 panic_info: impl Display,
1825 timeout_millis: Option<u64>,
1826 ) {
1827 let event = event_log::EventWorkerNodePanic {
1828 worker_id: self.worker_id,
1829 worker_type: self.worker_type.into(),
1830 host_addr: Some(self.host_addr.to_protobuf()),
1831 panic_info: format!("{panic_info}"),
1832 };
1833 let grpc_meta_client = self.inner.clone();
1834 let _ = thread::spawn(move || {
1835 let rt = tokio::runtime::Builder::new_current_thread()
1836 .enable_all()
1837 .build()
1838 .unwrap();
1839 let req = AddEventLogRequest {
1840 event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1841 };
1842 rt.block_on(async {
1843 let _ = tokio::time::timeout(
1844 Duration::from_millis(timeout_millis.unwrap_or(1000)),
1845 grpc_meta_client.add_event_log(req),
1846 )
1847 .await;
1848 });
1849 })
1850 .join();
1851 }
1852
1853 pub async fn add_sink_fail_evet(
1854 &self,
1855 sink_id: SinkId,
1856 sink_name: String,
1857 connector: String,
1858 error: String,
1859 ) -> Result<()> {
1860 let event = event_log::EventSinkFail {
1861 sink_id,
1862 sink_name,
1863 connector,
1864 error,
1865 };
1866 let req = AddEventLogRequest {
1867 event: Some(add_event_log_request::Event::SinkFail(event)),
1868 };
1869 self.inner.add_event_log(req).await?;
1870 Ok(())
1871 }
1872
1873 pub async fn add_cdc_auto_schema_change_fail_event(
1874 &self,
1875 source_id: SourceId,
1876 table_name: String,
1877 cdc_table_id: String,
1878 upstream_ddl: String,
1879 fail_info: String,
1880 ) -> Result<()> {
1881 let event = event_log::EventAutoSchemaChangeFail {
1882 table_id: source_id.as_cdc_table_id(),
1883 table_name,
1884 cdc_table_id,
1885 upstream_ddl,
1886 fail_info,
1887 };
1888 let req = AddEventLogRequest {
1889 event: Some(add_event_log_request::Event::AutoSchemaChangeFail(event)),
1890 };
1891 self.inner.add_event_log(req).await?;
1892 Ok(())
1893 }
1894
1895 pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1896 let req = CancelCompactTaskRequest {
1897 task_id,
1898 task_status: task_status as _,
1899 };
1900 let resp = self.inner.cancel_compact_task(req).await?;
1901 Ok(resp.ret)
1902 }
1903
1904 pub async fn get_version_by_epoch(
1905 &self,
1906 epoch: HummockEpoch,
1907 table_id: TableId,
1908 ) -> Result<PbHummockVersion> {
1909 let req = GetVersionByEpochRequest { epoch, table_id };
1910 let resp = self.inner.get_version_by_epoch(req).await?;
1911 Ok(resp.version.unwrap())
1912 }
1913
1914 pub async fn get_cluster_limits(
1915 &self,
1916 ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1917 let req = GetClusterLimitsRequest {};
1918 let resp = self.inner.get_cluster_limits(req).await?;
1919 Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1920 }
1921
1922 pub async fn merge_compaction_group(
1923 &self,
1924 left_group_id: CompactionGroupId,
1925 right_group_id: CompactionGroupId,
1926 ) -> Result<()> {
1927 let req = MergeCompactionGroupRequest {
1928 left_group_id,
1929 right_group_id,
1930 };
1931 self.inner.merge_compaction_group(req).await?;
1932 Ok(())
1933 }
1934
1935 pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1937 let request = ListRateLimitsRequest {};
1938 let resp = self.inner.list_rate_limits(request).await?;
1939 Ok(resp.rate_limits)
1940 }
1941
1942 pub async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
1943 let request = ListCdcProgressRequest {};
1944 let resp = self.inner.list_cdc_progress(request).await?;
1945 Ok(resp.cdc_progress)
1946 }
1947
1948 pub async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>> {
1949 let request = ListRefreshTableStatesRequest {};
1950 let resp = self.inner.list_refresh_table_states(request).await?;
1951 Ok(resp.states)
1952 }
1953
1954 pub async fn list_sink_log_store_tables(
1955 &self,
1956 ) -> Result<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
1957 let request = ListSinkLogStoreTablesRequest {};
1958 let resp = self.inner.list_sink_log_store_tables(request).await?;
1959 Ok(resp.tables)
1960 }
1961
1962 pub async fn create_iceberg_table(
1963 &self,
1964 table_job_info: PbTableJobInfo,
1965 sink_job_info: PbSinkJobInfo,
1966 iceberg_source: PbSource,
1967 if_not_exists: bool,
1968 ) -> Result<WaitVersion> {
1969 let request = CreateIcebergTableRequest {
1970 table_info: Some(table_job_info),
1971 sink_info: Some(sink_job_info),
1972 iceberg_source: Some(iceberg_source),
1973 if_not_exists,
1974 };
1975
1976 let resp = self.inner.create_iceberg_table(request).await?;
1977 Ok(resp
1978 .version
1979 .ok_or_else(|| anyhow!("wait version not set"))?)
1980 }
1981
1982 pub async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
1983 let request = ListIcebergTablesRequest {};
1984 let resp = self.inner.list_iceberg_tables(request).await?;
1985 Ok(resp.iceberg_tables)
1986 }
1987
1988 pub async fn get_cluster_stack_trace(
1990 &self,
1991 actor_traces_format: ActorTracesFormat,
1992 ) -> Result<StackTraceResponse> {
1993 let request = StackTraceRequest {
1994 actor_traces_format: actor_traces_format as i32,
1995 };
1996 let resp = self.inner.stack_trace(request).await?;
1997 Ok(resp)
1998 }
1999
2000 pub async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
2001 let request = SetSyncLogStoreAlignedRequest { job_id, aligned };
2002 self.inner.set_sync_log_store_aligned(request).await?;
2003 Ok(())
2004 }
2005
2006 pub async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
2007 self.inner.refresh(request).await
2008 }
2009
2010 pub async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
2011 let request = ListUnmigratedTablesRequest {};
2012 let resp = self.inner.list_unmigrated_tables(request).await?;
2013 Ok(resp
2014 .tables
2015 .into_iter()
2016 .map(|table| (table.table_id, table.table_name))
2017 .collect())
2018 }
2019}
2020
2021#[async_trait]
2022impl HummockMetaClient for MetaClient {
2023 async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
2024 let req = UnpinVersionBeforeRequest {
2025 context_id: self.worker_id(),
2026 unpin_version_before,
2027 };
2028 self.inner.unpin_version_before(req).await?;
2029 Ok(())
2030 }
2031
2032 async fn get_current_version(&self) -> Result<HummockVersion> {
2033 let req = GetCurrentVersionRequest::default();
2034 Ok(HummockVersion::from_rpc_protobuf(
2035 &self
2036 .inner
2037 .get_current_version(req)
2038 .await?
2039 .current_version
2040 .unwrap(),
2041 ))
2042 }
2043
2044 async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
2045 let resp = self
2046 .inner
2047 .get_new_object_ids(GetNewObjectIdsRequest { number })
2048 .await?;
2049 Ok(ObjectIdRange::new(resp.start_id, resp.end_id))
2050 }
2051
2052 async fn commit_epoch_with_change_log(
2053 &self,
2054 _epoch: HummockEpoch,
2055 _sync_result: SyncResult,
2056 _change_log_info: Option<HummockMetaClientChangeLogInfo>,
2057 ) -> Result<()> {
2058 panic!("Only meta service can commit_epoch in production.")
2059 }
2060
2061 async fn trigger_manual_compaction(
2062 &self,
2063 compaction_group_id: CompactionGroupId,
2064 table_id: JobId,
2065 level: u32,
2066 sst_ids: Vec<HummockSstableId>,
2067 ) -> Result<()> {
2068 let req = TriggerManualCompactionRequest {
2070 compaction_group_id,
2071 table_id,
2072 level,
2075 sst_ids,
2076 ..Default::default()
2077 };
2078
2079 self.inner.trigger_manual_compaction(req).await?;
2080 Ok(())
2081 }
2082
2083 async fn trigger_full_gc(
2084 &self,
2085 sst_retention_time_sec: u64,
2086 prefix: Option<String>,
2087 ) -> Result<()> {
2088 self.inner
2089 .trigger_full_gc(TriggerFullGcRequest {
2090 sst_retention_time_sec,
2091 prefix,
2092 })
2093 .await?;
2094 Ok(())
2095 }
2096
2097 async fn subscribe_compaction_event(
2098 &self,
2099 ) -> Result<(
2100 UnboundedSender<SubscribeCompactionEventRequest>,
2101 BoxStream<'static, CompactionEventItem>,
2102 )> {
2103 let (request_sender, request_receiver) =
2104 unbounded_channel::<SubscribeCompactionEventRequest>();
2105 request_sender
2106 .send(SubscribeCompactionEventRequest {
2107 event: Some(subscribe_compaction_event_request::Event::Register(
2108 Register {
2109 context_id: self.worker_id(),
2110 },
2111 )),
2112 create_at: SystemTime::now()
2113 .duration_since(SystemTime::UNIX_EPOCH)
2114 .expect("Clock may have gone backwards")
2115 .as_millis() as u64,
2116 })
2117 .context("Failed to subscribe compaction event")?;
2118
2119 let stream = self
2120 .inner
2121 .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
2122 request_receiver,
2123 )))
2124 .await?;
2125
2126 Ok((request_sender, Box::pin(stream)))
2127 }
2128
2129 async fn get_version_by_epoch(
2130 &self,
2131 epoch: HummockEpoch,
2132 table_id: TableId,
2133 ) -> Result<PbHummockVersion> {
2134 self.get_version_by_epoch(epoch, table_id).await
2135 }
2136
2137 async fn subscribe_iceberg_compaction_event(
2138 &self,
2139 ) -> Result<(
2140 UnboundedSender<SubscribeIcebergCompactionEventRequest>,
2141 BoxStream<'static, IcebergCompactionEventItem>,
2142 )> {
2143 let (request_sender, request_receiver) =
2144 unbounded_channel::<SubscribeIcebergCompactionEventRequest>();
2145 request_sender
2146 .send(SubscribeIcebergCompactionEventRequest {
2147 event: Some(subscribe_iceberg_compaction_event_request::Event::Register(
2148 IcebergRegister {
2149 context_id: self.worker_id(),
2150 },
2151 )),
2152 create_at: SystemTime::now()
2153 .duration_since(SystemTime::UNIX_EPOCH)
2154 .expect("Clock may have gone backwards")
2155 .as_millis() as u64,
2156 })
2157 .context("Failed to subscribe compaction event")?;
2158
2159 let stream = self
2160 .inner
2161 .subscribe_iceberg_compaction_event(Request::new(UnboundedReceiverStream::new(
2162 request_receiver,
2163 )))
2164 .await?;
2165
2166 Ok((request_sender, Box::pin(stream)))
2167 }
2168}
2169
2170#[async_trait]
2171impl TelemetryInfoFetcher for MetaClient {
2172 async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
2173 let resp = self
2174 .get_telemetry_info()
2175 .await
2176 .map_err(|e| e.to_report_string())?;
2177 let tracking_id = resp.get_tracking_id().ok();
2178 Ok(tracking_id.map(|id| id.to_owned()))
2179 }
2180}
2181
2182pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
2183
2184#[derive(Debug, Clone)]
2185struct GrpcMetaClientCore {
2186 cluster_client: ClusterServiceClient<Channel>,
2187 meta_member_client: MetaMemberServiceClient<Channel>,
2188 heartbeat_client: HeartbeatServiceClient<Channel>,
2189 ddl_client: DdlServiceClient<Channel>,
2190 hummock_client: HummockManagerServiceClient<Channel>,
2191 notification_client: NotificationServiceClient<Channel>,
2192 stream_client: StreamManagerServiceClient<Channel>,
2193 user_client: UserServiceClient<Channel>,
2194 scale_client: ScaleServiceClient<Channel>,
2195 backup_client: BackupServiceClient<Channel>,
2196 telemetry_client: TelemetryInfoServiceClient<Channel>,
2197 system_params_client: SystemParamsServiceClient<Channel>,
2198 session_params_client: SessionParamServiceClient<Channel>,
2199 serving_client: ServingServiceClient<Channel>,
2200 cloud_client: CloudServiceClient<Channel>,
2201 sink_coordinate_client: SinkCoordinationRpcClient,
2202 event_log_client: EventLogServiceClient<Channel>,
2203 cluster_limit_client: ClusterLimitServiceClient<Channel>,
2204 hosted_iceberg_catalog_service_client: HostedIcebergCatalogServiceClient<Channel>,
2205 monitor_client: MonitorServiceClient<Channel>,
2206}
2207
2208impl GrpcMetaClientCore {
2209 pub(crate) fn new(channel: Channel) -> Self {
2210 let cluster_client = ClusterServiceClient::new(channel.clone());
2211 let meta_member_client = MetaMemberClient::new(channel.clone());
2212 let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
2213 let ddl_client =
2214 DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2215 let hummock_client =
2216 HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2217 let notification_client =
2218 NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2219 let stream_client =
2220 StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2221 let user_client = UserServiceClient::new(channel.clone());
2222 let scale_client =
2223 ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2224 let backup_client = BackupServiceClient::new(channel.clone());
2225 let telemetry_client =
2226 TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2227 let system_params_client = SystemParamsServiceClient::new(channel.clone());
2228 let session_params_client = SessionParamServiceClient::new(channel.clone());
2229 let serving_client = ServingServiceClient::new(channel.clone());
2230 let cloud_client = CloudServiceClient::new(channel.clone());
2231 let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone())
2232 .max_decoding_message_size(usize::MAX);
2233 let event_log_client = EventLogServiceClient::new(channel.clone());
2234 let cluster_limit_client = ClusterLimitServiceClient::new(channel.clone());
2235 let hosted_iceberg_catalog_service_client =
2236 HostedIcebergCatalogServiceClient::new(channel.clone());
2237 let monitor_client = MonitorServiceClient::new(channel);
2238
2239 GrpcMetaClientCore {
2240 cluster_client,
2241 meta_member_client,
2242 heartbeat_client,
2243 ddl_client,
2244 hummock_client,
2245 notification_client,
2246 stream_client,
2247 user_client,
2248 scale_client,
2249 backup_client,
2250 telemetry_client,
2251 system_params_client,
2252 session_params_client,
2253 serving_client,
2254 cloud_client,
2255 sink_coordinate_client,
2256 event_log_client,
2257 cluster_limit_client,
2258 hosted_iceberg_catalog_service_client,
2259 monitor_client,
2260 }
2261 }
2262}
2263
2264#[derive(Debug, Clone)]
2268struct GrpcMetaClient {
2269 member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
2270 core: Arc<RwLock<GrpcMetaClientCore>>,
2271}
2272
2273type MetaMemberClient = MetaMemberServiceClient<Channel>;
2274
2275struct MetaMemberGroup {
2276 members: LruCache<http::Uri, Option<MetaMemberClient>>,
2277}
2278
2279struct MetaMemberManagement {
2280 core_ref: Arc<RwLock<GrpcMetaClientCore>>,
2281 members: Either<MetaMemberClient, MetaMemberGroup>,
2282 current_leader: http::Uri,
2283 meta_config: Arc<MetaConfig>,
2284}
2285
2286impl MetaMemberManagement {
2287 const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
2288
2289 fn host_address_to_uri(addr: HostAddress) -> http::Uri {
2290 format!("http://{}:{}", addr.host, addr.port)
2291 .parse()
2292 .unwrap()
2293 }
2294
2295 async fn recreate_core(&self, channel: Channel) {
2296 let mut core = self.core_ref.write().await;
2297 *core = GrpcMetaClientCore::new(channel);
2298 }
2299
2300 async fn refresh_members(&mut self) -> Result<()> {
2301 let leader_addr = match self.members.as_mut() {
2302 Either::Left(client) => {
2303 let resp = client
2304 .to_owned()
2305 .members(MembersRequest {})
2306 .await
2307 .map_err(RpcError::from_meta_status)?;
2308 let resp = resp.into_inner();
2309 resp.members.into_iter().find(|member| member.is_leader)
2310 }
2311 Either::Right(member_group) => {
2312 let mut fetched_members = None;
2313
2314 for (addr, client) in &mut member_group.members {
2315 let members: Result<_> = try {
2316 let mut client = match client {
2317 Some(cached_client) => cached_client.to_owned(),
2318 None => {
2319 let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
2320 let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
2321 .await
2322 .context("failed to create client")?;
2323 let new_client: MetaMemberClient =
2324 MetaMemberServiceClient::new(channel);
2325 *client = Some(new_client.clone());
2326
2327 new_client
2328 }
2329 };
2330
2331 let resp = client
2332 .members(MembersRequest {})
2333 .await
2334 .context("failed to fetch members")?;
2335
2336 resp.into_inner().members
2337 };
2338
2339 let fetched = members.is_ok();
2340 fetched_members = Some(members);
2341 if fetched {
2342 break;
2343 }
2344 }
2345
2346 let members = fetched_members
2347 .context("no member available in the list")?
2348 .context("could not refresh members")?;
2349
2350 let mut leader = None;
2352 for member in members {
2353 if member.is_leader {
2354 leader = Some(member.clone());
2355 }
2356
2357 let addr = Self::host_address_to_uri(member.address.unwrap());
2358 if !member_group.members.contains(&addr) {
2360 tracing::info!("new meta member joined: {}", addr);
2361 member_group.members.put(addr, None);
2362 }
2363 }
2364
2365 leader
2366 }
2367 };
2368
2369 if let Some(leader) = leader_addr {
2370 let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
2371
2372 if discovered_leader != self.current_leader {
2373 tracing::info!("new meta leader {} discovered", discovered_leader);
2374
2375 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
2376 Duration::from_secs(self.meta_config.meta_leader_lease_secs),
2377 false,
2378 );
2379
2380 let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
2381 let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
2382 GrpcMetaClient::connect_to_endpoint(endpoint).await
2383 })
2384 .await?;
2385
2386 self.recreate_core(channel).await;
2387 self.current_leader = discovered_leader;
2388 }
2389 }
2390
2391 Ok(())
2392 }
2393}
2394
2395impl GrpcMetaClient {
2396 const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
2398 const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
2400 const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
2402 const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
2404
2405 fn start_meta_member_monitor(
2406 &self,
2407 init_leader_addr: http::Uri,
2408 members: Either<MetaMemberClient, MetaMemberGroup>,
2409 force_refresh_receiver: Receiver<Sender<Result<()>>>,
2410 meta_config: Arc<MetaConfig>,
2411 ) -> Result<()> {
2412 let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
2413 let current_leader = init_leader_addr;
2414
2415 let enable_period_tick = matches!(members, Either::Right(_));
2416
2417 let member_management = MetaMemberManagement {
2418 core_ref,
2419 members,
2420 current_leader,
2421 meta_config,
2422 };
2423
2424 let mut force_refresh_receiver = force_refresh_receiver;
2425
2426 tokio::spawn(async move {
2427 let mut member_management = member_management;
2428 let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
2429
2430 loop {
2431 let event: Option<Sender<Result<()>>> = if enable_period_tick {
2432 tokio::select! {
2433 _ = ticker.tick() => None,
2434 result_sender = force_refresh_receiver.recv() => {
2435 if result_sender.is_none() {
2436 break;
2437 }
2438
2439 result_sender
2440 },
2441 }
2442 } else {
2443 let result_sender = force_refresh_receiver.recv().await;
2444
2445 if result_sender.is_none() {
2446 break;
2447 }
2448
2449 result_sender
2450 };
2451
2452 let tick_result = member_management.refresh_members().await;
2453 if let Err(e) = tick_result.as_ref() {
2454 tracing::warn!(error = %e.as_report(), "refresh meta member client failed");
2455 }
2456
2457 if let Some(sender) = event {
2458 let _resp = sender.send(tick_result);
2460 }
2461 }
2462 });
2463
2464 Ok(())
2465 }
2466
2467 async fn force_refresh_leader(&self) -> Result<()> {
2468 let (sender, receiver) = oneshot::channel();
2469
2470 self.member_monitor_event_sender
2471 .send(sender)
2472 .await
2473 .map_err(|e| anyhow!(e))?;
2474
2475 receiver.await.map_err(|e| anyhow!(e))?
2476 }
2477
2478 pub async fn new(strategy: &MetaAddressStrategy, config: Arc<MetaConfig>) -> Result<Self> {
2480 let (channel, addr) = match strategy {
2481 MetaAddressStrategy::LoadBalance(addr) => {
2482 Self::try_build_rpc_channel(vec![addr.clone()]).await
2483 }
2484 MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
2485 }?;
2486 let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
2487 let client = GrpcMetaClient {
2488 member_monitor_event_sender: force_refresh_sender,
2489 core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
2490 };
2491
2492 let meta_member_client = client.core.read().await.meta_member_client.clone();
2493 let members = match strategy {
2494 MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
2495 MetaAddressStrategy::List(addrs) => {
2496 let mut members = LruCache::new(20.try_into().unwrap());
2497 for addr in addrs {
2498 members.put(addr.clone(), None);
2499 }
2500 members.put(addr.clone(), Some(meta_member_client));
2501
2502 Either::Right(MetaMemberGroup { members })
2503 }
2504 };
2505
2506 client.start_meta_member_monitor(addr, members, force_refresh_receiver, config.clone())?;
2507
2508 client.force_refresh_leader().await?;
2509
2510 Ok(client)
2511 }
2512
2513 fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2514 Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2515 }
2516
2517 pub(crate) async fn try_build_rpc_channel(
2518 addrs: impl IntoIterator<Item = http::Uri>,
2519 ) -> Result<(Channel, http::Uri)> {
2520 let endpoints: Vec<_> = addrs
2521 .into_iter()
2522 .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2523 .collect();
2524
2525 let mut last_error = None;
2526
2527 for (endpoint, addr) in endpoints {
2528 match Self::connect_to_endpoint(endpoint).await {
2529 Ok(channel) => {
2530 tracing::info!("Connect to meta server {} successfully", addr);
2531 return Ok((channel, addr));
2532 }
2533 Err(e) => {
2534 tracing::warn!(
2535 error = %e.as_report(),
2536 "Failed to connect to meta server {}, trying again",
2537 addr,
2538 );
2539 last_error = Some(e);
2540 }
2541 }
2542 }
2543
2544 if let Some(last_error) = last_error {
2545 Err(anyhow::anyhow!(last_error)
2546 .context("failed to connect to all meta servers")
2547 .into())
2548 } else {
2549 bail!("no meta server address provided")
2550 }
2551 }
2552
2553 async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2554 let channel = endpoint
2555 .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2556 .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2557 .connect_timeout(Duration::from_secs(5))
2558 .monitored_connect("grpc-meta-client", Default::default())
2559 .await?
2560 .wrapped();
2561
2562 Ok(channel)
2563 }
2564
2565 pub(crate) fn retry_strategy_to_bound(
2566 high_bound: Duration,
2567 exceed: bool,
2568 ) -> impl Iterator<Item = Duration> {
2569 let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2570 .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2571 .map(jitter);
2572
2573 let mut sum = Duration::default();
2574
2575 iter.take_while(move |duration| {
2576 sum += *duration;
2577
2578 if exceed {
2579 sum < high_bound + *duration
2580 } else {
2581 sum < high_bound
2582 }
2583 })
2584 }
2585}
2586
2587macro_rules! for_all_meta_rpc {
2588 ($macro:ident) => {
2589 $macro! {
2590 { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2591 ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2592 ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2593 ,{ cluster_client, update_worker_node_schedulability, UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse }
2594 ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2595 ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2596 ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2597 ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2598 ,{ stream_client, flush, FlushRequest, FlushResponse }
2599 ,{ stream_client, pause, PauseRequest, PauseResponse }
2600 ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2601 ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2602 ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2603 ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2604 ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2605 ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2606 ,{ stream_client, list_creating_fragment_distribution, ListCreatingFragmentDistributionRequest, ListCreatingFragmentDistributionResponse }
2607 ,{ stream_client, list_sink_log_store_tables, ListSinkLogStoreTablesRequest, ListSinkLogStoreTablesResponse }
2608 ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2609 ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2610 ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2611 ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2612 ,{ stream_client, list_cdc_progress, ListCdcProgressRequest, ListCdcProgressResponse }
2613 ,{ stream_client, list_refresh_table_states, ListRefreshTableStatesRequest, ListRefreshTableStatesResponse }
2614 ,{ stream_client, alter_connector_props, AlterConnectorPropsRequest, AlterConnectorPropsResponse }
2615 ,{ stream_client, alter_source_properties_safe, AlterSourcePropertiesSafeRequest, AlterSourcePropertiesSafeResponse }
2616 ,{ stream_client, reset_source_splits, ResetSourceSplitsRequest, ResetSourceSplitsResponse }
2617 ,{ stream_client, inject_source_offsets, InjectSourceOffsetsRequest, InjectSourceOffsetsResponse }
2618 ,{ stream_client, get_fragment_by_id, GetFragmentByIdRequest, GetFragmentByIdResponse }
2619 ,{ stream_client, get_fragment_vnodes, GetFragmentVnodesRequest, GetFragmentVnodesResponse }
2620 ,{ stream_client, get_actor_vnodes, GetActorVnodesRequest, GetActorVnodesResponse }
2621 ,{ stream_client, set_sync_log_store_aligned, SetSyncLogStoreAlignedRequest, SetSyncLogStoreAlignedResponse }
2622 ,{ stream_client, refresh, RefreshRequest, RefreshResponse }
2623 ,{ stream_client, list_unmigrated_tables, ListUnmigratedTablesRequest, ListUnmigratedTablesResponse }
2624 ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2625 ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2626 ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2627 ,{ ddl_client, alter_subscription_retention, AlterSubscriptionRetentionRequest, AlterSubscriptionRetentionResponse }
2628 ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2629 ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2630 ,{ ddl_client, alter_backfill_parallelism, AlterBackfillParallelismRequest, AlterBackfillParallelismResponse }
2631 ,{ ddl_client, alter_streaming_job_config, AlterStreamingJobConfigRequest, AlterStreamingJobConfigResponse }
2632 ,{ ddl_client, alter_fragment_parallelism, AlterFragmentParallelismRequest, AlterFragmentParallelismResponse }
2633 ,{ ddl_client, alter_cdc_table_backfill_parallelism, AlterCdcTableBackfillParallelismRequest, AlterCdcTableBackfillParallelismResponse }
2634 ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2635 ,{ ddl_client, alter_database_param, AlterDatabaseParamRequest, AlterDatabaseParamResponse }
2636 ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2637 ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2638 ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2639 ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2640 ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2641 ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2642 ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2643 ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2644 ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2645 ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2646 ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2647 ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2648 ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2649 ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2650 ,{ ddl_client, reset_source, ResetSourceRequest, ResetSourceResponse }
2651 ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2652 ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2653 ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2654 ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2655 ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2656 ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2657 ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2658 ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2659 ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2660 ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2661 ,{ ddl_client, risectl_resume_backfill, RisectlResumeBackfillRequest, RisectlResumeBackfillResponse }
2662 ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2663 ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2664 ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2665 ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2666 ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2667 ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2668 ,{ ddl_client, wait, WaitRequest, WaitResponse }
2669 ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2670 ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2671 ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2672 ,{ ddl_client, compact_iceberg_table, CompactIcebergTableRequest, CompactIcebergTableResponse }
2673 ,{ ddl_client, expire_iceberg_table_snapshots, ExpireIcebergTableSnapshotsRequest, ExpireIcebergTableSnapshotsResponse }
2674 ,{ ddl_client, create_iceberg_table, CreateIcebergTableRequest, CreateIcebergTableResponse }
2675 ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2676 ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2677 ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2678 ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2679 ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2680 ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2681 ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2682 ,{ hummock_client, get_new_object_ids, GetNewObjectIdsRequest, GetNewObjectIdsResponse }
2683 ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2684 ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2685 ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2686 ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2687 ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2688 ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2689 ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2690 ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2691 ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2692 ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2693 ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2694 ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2695 ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2696 ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2697 ,{ hummock_client, subscribe_iceberg_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeIcebergCompactionEventRequest>, Streaming<SubscribeIcebergCompactionEventResponse> }
2698 ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2699 ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2700 ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2701 ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2702 ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2703 ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2704 ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2705 ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2706 ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2707 ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2708 ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2709 ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2710 ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2711 ,{ user_client, alter_default_privilege, AlterDefaultPrivilegeRequest, AlterDefaultPrivilegeResponse }
2712 ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2713 ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2714 ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2715 ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2716 ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2717 ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2718 ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2719 ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2720 ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2721 ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2722 ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2723 ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2724 ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2725 ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2726 ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2727 ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2728 ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2729 ,{ hosted_iceberg_catalog_service_client, list_iceberg_tables, ListIcebergTablesRequest, ListIcebergTablesResponse }
2730 ,{ monitor_client, stack_trace, StackTraceRequest, StackTraceResponse }
2731 }
2732 };
2733}
2734
2735impl GrpcMetaClient {
2736 async fn refresh_client_if_needed(&self, code: Code) {
2737 if matches!(
2738 code,
2739 Code::Unknown | Code::Unimplemented | Code::Unavailable
2740 ) {
2741 tracing::debug!("matching tonic code {}", code);
2742 let (result_sender, result_receiver) = oneshot::channel();
2743 if self
2744 .member_monitor_event_sender
2745 .try_send(result_sender)
2746 .is_ok()
2747 {
2748 if let Ok(Err(e)) = result_receiver.await {
2749 tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2750 }
2751 } else {
2752 tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2753 }
2754 }
2755 }
2756}
2757
2758impl GrpcMetaClient {
2759 for_all_meta_rpc! { meta_rpc_client_method_impl }
2760}