1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{
33 AlterDatabaseParam, FunctionId, IndexId, ObjectId, SecretId, TableId,
34};
35use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
36use risingwave_common::hash::WorkerSlotMapping;
37use risingwave_common::id::{
38 ConnectionId, DatabaseId, JobId, SchemaId, SinkId, SubscriptionId, UserId, ViewId, WorkerId,
39};
40use risingwave_common::monitor::EndpointExt;
41use risingwave_common::system_param::AdaptiveParallelismStrategy;
42use risingwave_common::system_param::reader::SystemParamsReader;
43use risingwave_common::telemetry::report::TelemetryInfoFetcher;
44use risingwave_common::util::addr::HostAddr;
45use risingwave_common::util::meta_addr::MetaAddressStrategy;
46use risingwave_common::util::resource_util::cpu::total_cpu_available;
47use risingwave_common::util::resource_util::hostname;
48use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
49use risingwave_error::bail;
50use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
51use risingwave_hummock_sdk::change_log::{TableChangeLog, TableChangeLogs};
52use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
53use risingwave_hummock_sdk::{
54 CompactionGroupId, HummockEpoch, HummockVersionId, ObjectIdRange, SyncResult,
55};
56use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
57use risingwave_pb::backup_service::*;
58use risingwave_pb::catalog::{
59 Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
60 PbSubscription, PbTable, PbView, Table,
61};
62use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
63use risingwave_pb::cloud_service::*;
64use risingwave_pb::common::worker_node::Property;
65use risingwave_pb::common::{HostAddress, OptionalUint32, OptionalUint64, WorkerNode, WorkerType};
66use risingwave_pb::configured_monitor_service_client;
67use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
68use risingwave_pb::ddl_service::alter_owner_request::Object;
69use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
70use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
71use risingwave_pb::ddl_service::*;
72use risingwave_pb::hummock::compact_task::TaskStatus;
73use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
74use risingwave_pb::hummock::get_table_change_logs_request::PbTableFilter;
75use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
76use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
77use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
78use risingwave_pb::hummock::write_limits::WriteLimit;
79use risingwave_pb::hummock::*;
80use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Register as IcebergRegister;
81use risingwave_pb::iceberg_compaction::{
82 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
83 subscribe_iceberg_compaction_event_request,
84};
85use risingwave_pb::id::{ActorId, FragmentId, HummockSstableId, SourceId};
86use risingwave_pb::meta::alter_connector_props_request::{
87 AlterConnectorPropsObject, AlterIcebergTableIds, ExtraOptions,
88};
89use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
90use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
91use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
92use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
93use risingwave_pb::meta::hosted_iceberg_catalog_service_client::HostedIcebergCatalogServiceClient;
94use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
95use risingwave_pb::meta::list_actor_states_response::ActorState;
96use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
97use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
98use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
99use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
100use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
101use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
102use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
103use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
104use risingwave_pb::meta::serving_service_client::ServingServiceClient;
105use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
106use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
107use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
108use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
109use risingwave_pb::meta::{FragmentDistribution, *};
110use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
111use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
112use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
113use risingwave_pb::secret::PbSecretRef;
114use risingwave_pb::stream_plan::StreamFragmentGraph;
115use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
116use risingwave_pb::user::update_user_request::UpdateField;
117use risingwave_pb::user::user_service_client::UserServiceClient;
118use risingwave_pb::user::*;
119use thiserror_ext::AsReport;
120use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
121use tokio::sync::oneshot::Sender;
122use tokio::sync::{RwLock, mpsc, oneshot};
123use tokio::task::JoinHandle;
124use tokio::time::{self};
125use tokio_retry::strategy::{ExponentialBackoff, jitter};
126use tokio_stream::wrappers::UnboundedReceiverStream;
127use tonic::transport::Endpoint;
128use tonic::{Code, Request, Streaming};
129
130use crate::channel::{Channel, WrappedChannelExt};
131use crate::error::{Result, RpcError};
132use crate::hummock_meta_client::{
133 CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
134 IcebergCompactionEventItem,
135};
136use crate::meta_rpc_client_method_impl;
137
138#[derive(Clone, Debug)]
140pub struct MetaClient {
141 worker_id: WorkerId,
142 worker_type: WorkerType,
143 host_addr: HostAddr,
144 inner: GrpcMetaClient,
145 meta_config: Arc<MetaConfig>,
146 cluster_id: String,
147 shutting_down: Arc<AtomicBool>,
148}
149
150impl MetaClient {
151 pub fn worker_id(&self) -> WorkerId {
152 self.worker_id
153 }
154
155 pub fn host_addr(&self) -> &HostAddr {
156 &self.host_addr
157 }
158
159 pub fn worker_type(&self) -> WorkerType {
160 self.worker_type
161 }
162
163 pub fn cluster_id(&self) -> &str {
164 &self.cluster_id
165 }
166
167 pub async fn subscribe(
169 &self,
170 subscribe_type: SubscribeType,
171 ) -> Result<Streaming<SubscribeResponse>> {
172 let request = SubscribeRequest {
173 subscribe_type: subscribe_type as i32,
174 host: Some(self.host_addr.to_protobuf()),
175 worker_id: self.worker_id(),
176 };
177
178 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
179 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
180 true,
181 );
182
183 tokio_retry::Retry::spawn(retry_strategy, || async {
184 let request = request.clone();
185 self.inner.subscribe(request).await
186 })
187 .await
188 }
189
190 pub async fn create_connection(
191 &self,
192 connection_name: String,
193 database_id: DatabaseId,
194 schema_id: SchemaId,
195 owner_id: UserId,
196 req: create_connection_request::Payload,
197 ) -> Result<WaitVersion> {
198 let request = CreateConnectionRequest {
199 name: connection_name,
200 database_id,
201 schema_id,
202 owner_id,
203 payload: Some(req),
204 };
205 let resp = self.inner.create_connection(request).await?;
206 Ok(resp
207 .version
208 .ok_or_else(|| anyhow!("wait version not set"))?)
209 }
210
211 pub async fn create_secret(
212 &self,
213 secret_name: String,
214 database_id: DatabaseId,
215 schema_id: SchemaId,
216 owner_id: UserId,
217 value: Vec<u8>,
218 ) -> Result<WaitVersion> {
219 let request = CreateSecretRequest {
220 name: secret_name,
221 database_id,
222 schema_id,
223 owner_id,
224 value,
225 };
226 let resp = self.inner.create_secret(request).await?;
227 Ok(resp
228 .version
229 .ok_or_else(|| anyhow!("wait version not set"))?)
230 }
231
232 pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
233 let request = ListConnectionsRequest {};
234 let resp = self.inner.list_connections(request).await?;
235 Ok(resp.connections)
236 }
237
238 pub async fn drop_connection(
239 &self,
240 connection_id: ConnectionId,
241 cascade: bool,
242 ) -> Result<WaitVersion> {
243 let request = DropConnectionRequest {
244 connection_id,
245 cascade,
246 };
247 let resp = self.inner.drop_connection(request).await?;
248 Ok(resp
249 .version
250 .ok_or_else(|| anyhow!("wait version not set"))?)
251 }
252
253 pub async fn drop_secret(&self, secret_id: SecretId, cascade: bool) -> Result<WaitVersion> {
254 let request = DropSecretRequest { secret_id, cascade };
255 let resp = self.inner.drop_secret(request).await?;
256 Ok(resp
257 .version
258 .ok_or_else(|| anyhow!("wait version not set"))?)
259 }
260
261 pub async fn register_new(
265 addr_strategy: MetaAddressStrategy,
266 worker_type: WorkerType,
267 addr: &HostAddr,
268 property: Property,
269 meta_config: Arc<MetaConfig>,
270 ) -> (Self, SystemParamsReader) {
271 let ret =
272 Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
273
274 match ret {
275 Ok(ret) => ret,
276 Err(err) => {
277 tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
278 std::process::exit(1);
279 }
280 }
281 }
282
283 async fn register_new_inner(
284 addr_strategy: MetaAddressStrategy,
285 worker_type: WorkerType,
286 addr: &HostAddr,
287 property: Property,
288 meta_config: Arc<MetaConfig>,
289 ) -> Result<(Self, SystemParamsReader)> {
290 tracing::info!("register meta client using strategy: {}", addr_strategy);
291
292 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
294 Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
295 true,
296 );
297
298 let init_result: Result<_> = tokio_retry::RetryIf::spawn(
299 retry_strategy,
300 || async {
301 let grpc_meta_client =
302 GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
303
304 let add_worker_resp = grpc_meta_client
305 .add_worker_node(AddWorkerNodeRequest {
306 worker_type: worker_type as i32,
307 host: Some(addr.to_protobuf()),
308 property: Some(property.clone()),
309 resource: Some(risingwave_pb::common::worker_node::Resource {
310 rw_version: RW_VERSION.to_owned(),
311 total_memory_bytes: system_memory_available_bytes() as _,
312 total_cpu_cores: total_cpu_available() as _,
313 hostname: hostname(),
314 }),
315 })
316 .await
317 .context("failed to add worker node")?;
318
319 let system_params_resp = grpc_meta_client
320 .get_system_params(GetSystemParamsRequest {})
321 .await
322 .context("failed to get initial system params")?;
323
324 Ok((add_worker_resp, system_params_resp, grpc_meta_client))
325 },
326 |e: &RpcError| !e.is_from_tonic_server_impl(),
329 )
330 .await;
331
332 let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
333 let worker_id = add_worker_resp
334 .node_id
335 .expect("AddWorkerNodeResponse::node_id is empty");
336
337 let meta_client = Self {
338 worker_id,
339 worker_type,
340 host_addr: addr.clone(),
341 inner: grpc_meta_client,
342 meta_config: meta_config.clone(),
343 cluster_id: add_worker_resp.cluster_id,
344 shutting_down: Arc::new(false.into()),
345 };
346
347 static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
348 REPORT_PANIC.call_once(|| {
349 let meta_client_clone = meta_client.clone();
350 std::panic::update_hook(move |default_hook, info| {
351 meta_client_clone.try_add_panic_event_blocking(info, None);
353 default_hook(info);
354 });
355 });
356
357 Ok((meta_client, system_params_resp.params.unwrap().into()))
358 }
359
360 pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
362 let request = ActivateWorkerNodeRequest {
363 host: Some(addr.to_protobuf()),
364 node_id: self.worker_id,
365 };
366 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
367 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
368 true,
369 );
370 tokio_retry::Retry::spawn(retry_strategy, || async {
371 let request = request.clone();
372 self.inner.activate_worker_node(request).await
373 })
374 .await?;
375
376 Ok(())
377 }
378
379 pub async fn send_heartbeat(&self) -> Result<()> {
381 let request = HeartbeatRequest {
382 node_id: self.worker_id,
383 resource: Some(risingwave_pb::common::worker_node::Resource {
384 rw_version: RW_VERSION.to_owned(),
385 total_memory_bytes: system_memory_available_bytes() as _,
386 total_cpu_cores: total_cpu_available() as _,
387 hostname: hostname(),
388 }),
389 };
390 let resp = self.inner.heartbeat(request).await?;
391 if let Some(status) = resp.status
392 && status.code() == risingwave_pb::common::status::Code::UnknownWorker
393 {
394 if !self.shutting_down.load(Relaxed) {
397 tracing::error!(message = status.message, "worker expired");
398 std::process::exit(1);
399 }
400 }
401 Ok(())
402 }
403
404 pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
405 let request = CreateDatabaseRequest { db: Some(db) };
406 let resp = self.inner.create_database(request).await?;
407 Ok(resp
409 .version
410 .ok_or_else(|| anyhow!("wait version not set"))?)
411 }
412
413 pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
414 let request = CreateSchemaRequest {
415 schema: Some(schema),
416 };
417 let resp = self.inner.create_schema(request).await?;
418 Ok(resp
420 .version
421 .ok_or_else(|| anyhow!("wait version not set"))?)
422 }
423
424 pub async fn create_materialized_view(
425 &self,
426 table: PbTable,
427 graph: StreamFragmentGraph,
428 dependencies: HashSet<ObjectId>,
429 resource_type: streaming_job_resource_type::ResourceType,
430 if_not_exists: bool,
431 refresh_interval_sec: Option<u64>,
432 ) -> Result<WaitVersion> {
433 let request = CreateMaterializedViewRequest {
434 materialized_view: Some(table),
435 fragment_graph: Some(graph),
436 resource_type: Some(PbStreamingJobResourceType {
437 resource_type: Some(resource_type),
438 }),
439 dependencies: dependencies.into_iter().collect(),
440 if_not_exists,
441 refresh_interval_sec,
442 };
443 let resp = self.inner.create_materialized_view(request).await?;
444 Ok(resp
446 .version
447 .ok_or_else(|| anyhow!("wait version not set"))?)
448 }
449
450 pub async fn drop_materialized_view(
451 &self,
452 table_id: TableId,
453 cascade: bool,
454 ) -> Result<WaitVersion> {
455 let request = DropMaterializedViewRequest { table_id, cascade };
456
457 let resp = self.inner.drop_materialized_view(request).await?;
458 Ok(resp
459 .version
460 .ok_or_else(|| anyhow!("wait version not set"))?)
461 }
462
463 pub async fn create_source(
464 &self,
465 source: PbSource,
466 graph: Option<StreamFragmentGraph>,
467 if_not_exists: bool,
468 ) -> Result<WaitVersion> {
469 let request = CreateSourceRequest {
470 source: Some(source),
471 fragment_graph: graph,
472 if_not_exists,
473 };
474
475 let resp = self.inner.create_source(request).await?;
476 Ok(resp
477 .version
478 .ok_or_else(|| anyhow!("wait version not set"))?)
479 }
480
481 pub async fn create_sink(
482 &self,
483 sink: PbSink,
484 graph: StreamFragmentGraph,
485 dependencies: HashSet<ObjectId>,
486 if_not_exists: bool,
487 ) -> Result<WaitVersion> {
488 let request = CreateSinkRequest {
489 sink: Some(sink),
490 fragment_graph: Some(graph),
491 dependencies: dependencies.into_iter().collect(),
492 if_not_exists,
493 };
494
495 let resp = self.inner.create_sink(request).await?;
496 Ok(resp
497 .version
498 .ok_or_else(|| anyhow!("wait version not set"))?)
499 }
500
501 pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
502 let request = CreateSubscriptionRequest {
503 subscription: Some(subscription),
504 };
505
506 let resp = self.inner.create_subscription(request).await?;
507 Ok(resp
508 .version
509 .ok_or_else(|| anyhow!("wait version not set"))?)
510 }
511
512 pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
513 let request = CreateFunctionRequest {
514 function: Some(function),
515 };
516 let resp = self.inner.create_function(request).await?;
517 Ok(resp
518 .version
519 .ok_or_else(|| anyhow!("wait version not set"))?)
520 }
521
522 pub async fn create_table(
523 &self,
524 source: Option<PbSource>,
525 table: PbTable,
526 graph: StreamFragmentGraph,
527 job_type: PbTableJobType,
528 if_not_exists: bool,
529 dependencies: HashSet<ObjectId>,
530 ) -> Result<WaitVersion> {
531 let request = CreateTableRequest {
532 materialized_view: Some(table),
533 fragment_graph: Some(graph),
534 source,
535 job_type: job_type as _,
536 if_not_exists,
537 dependencies: dependencies.into_iter().collect(),
538 };
539 let resp = self.inner.create_table(request).await?;
540 Ok(resp
542 .version
543 .ok_or_else(|| anyhow!("wait version not set"))?)
544 }
545
546 pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
547 let request = CommentOnRequest {
548 comment: Some(comment),
549 };
550 let resp = self.inner.comment_on(request).await?;
551 Ok(resp
552 .version
553 .ok_or_else(|| anyhow!("wait version not set"))?)
554 }
555
556 pub async fn alter_name(
557 &self,
558 object: alter_name_request::Object,
559 name: &str,
560 ) -> Result<WaitVersion> {
561 let request = AlterNameRequest {
562 object: Some(object),
563 new_name: name.to_owned(),
564 };
565 let resp = self.inner.alter_name(request).await?;
566 Ok(resp
567 .version
568 .ok_or_else(|| anyhow!("wait version not set"))?)
569 }
570
571 pub async fn alter_owner(&self, object: Object, owner_id: UserId) -> Result<WaitVersion> {
572 let request = AlterOwnerRequest {
573 object: Some(object),
574 owner_id,
575 };
576 let resp = self.inner.alter_owner(request).await?;
577 Ok(resp
578 .version
579 .ok_or_else(|| anyhow!("wait version not set"))?)
580 }
581
582 pub async fn alter_subscription_retention(
583 &self,
584 subscription_id: SubscriptionId,
585 retention_seconds: u64,
586 definition: String,
587 ) -> Result<WaitVersion> {
588 let request = AlterSubscriptionRetentionRequest {
589 subscription_id,
590 retention_seconds,
591 definition,
592 };
593 let resp = self.inner.alter_subscription_retention(request).await?;
594 Ok(resp
595 .version
596 .ok_or_else(|| anyhow!("wait version not set"))?)
597 }
598
599 pub async fn alter_database_param(
600 &self,
601 database_id: DatabaseId,
602 param: AlterDatabaseParam,
603 ) -> Result<WaitVersion> {
604 let request = match param {
605 AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms) => {
606 let barrier_interval_ms = OptionalUint32 {
607 value: barrier_interval_ms,
608 };
609 AlterDatabaseParamRequest {
610 database_id,
611 param: Some(alter_database_param_request::Param::BarrierIntervalMs(
612 barrier_interval_ms,
613 )),
614 }
615 }
616 AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency) => {
617 let checkpoint_frequency = OptionalUint64 {
618 value: checkpoint_frequency,
619 };
620 AlterDatabaseParamRequest {
621 database_id,
622 param: Some(alter_database_param_request::Param::CheckpointFrequency(
623 checkpoint_frequency,
624 )),
625 }
626 }
627 };
628 let resp = self.inner.alter_database_param(request).await?;
629 Ok(resp
630 .version
631 .ok_or_else(|| anyhow!("wait version not set"))?)
632 }
633
634 pub async fn alter_set_schema(
635 &self,
636 object: alter_set_schema_request::Object,
637 new_schema_id: SchemaId,
638 ) -> Result<WaitVersion> {
639 let request = AlterSetSchemaRequest {
640 new_schema_id,
641 object: Some(object),
642 };
643 let resp = self.inner.alter_set_schema(request).await?;
644 Ok(resp
645 .version
646 .ok_or_else(|| anyhow!("wait version not set"))?)
647 }
648
649 pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
650 let request = AlterSourceRequest {
651 source: Some(source),
652 };
653 let resp = self.inner.alter_source(request).await?;
654 Ok(resp
655 .version
656 .ok_or_else(|| anyhow!("wait version not set"))?)
657 }
658
659 pub async fn alter_parallelism(
660 &self,
661 job_id: JobId,
662 parallelism: PbTableParallelism,
663 adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
664 deferred: bool,
665 ) -> Result<()> {
666 let request = AlterParallelismRequest {
667 table_id: job_id,
668 parallelism: Some(parallelism),
669 deferred,
670 adaptive_parallelism_strategy: adaptive_parallelism_strategy
671 .map(|strategy| strategy.to_string()),
672 };
673
674 self.inner.alter_parallelism(request).await?;
675 Ok(())
676 }
677
678 pub async fn alter_backfill_parallelism(
679 &self,
680 job_id: JobId,
681 parallelism: Option<PbTableParallelism>,
682 adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
683 deferred: bool,
684 ) -> Result<()> {
685 let request = AlterBackfillParallelismRequest {
686 table_id: job_id,
687 parallelism,
688 deferred,
689 adaptive_parallelism_strategy: adaptive_parallelism_strategy
690 .map(|strategy| strategy.to_string()),
691 };
692
693 self.inner.alter_backfill_parallelism(request).await?;
694 Ok(())
695 }
696
697 pub async fn alter_streaming_job_config(
698 &self,
699 job_id: JobId,
700 entries_to_add: HashMap<String, String>,
701 keys_to_remove: Vec<String>,
702 ) -> Result<()> {
703 let request = AlterStreamingJobConfigRequest {
704 job_id,
705 entries_to_add,
706 keys_to_remove,
707 };
708
709 self.inner.alter_streaming_job_config(request).await?;
710 Ok(())
711 }
712
713 pub async fn alter_fragment_parallelism(
714 &self,
715 fragment_ids: Vec<FragmentId>,
716 parallelism: Option<PbTableParallelism>,
717 ) -> Result<()> {
718 let request = AlterFragmentParallelismRequest {
719 fragment_ids,
720 parallelism,
721 };
722
723 self.inner.alter_fragment_parallelism(request).await?;
724 Ok(())
725 }
726
727 pub async fn alter_cdc_table_backfill_parallelism(
728 &self,
729 table_id: JobId,
730 parallelism: PbTableParallelism,
731 ) -> Result<()> {
732 let request = AlterCdcTableBackfillParallelismRequest {
733 table_id,
734 parallelism: Some(parallelism),
735 };
736 self.inner
737 .alter_cdc_table_backfill_parallelism(request)
738 .await?;
739 Ok(())
740 }
741
742 pub async fn alter_resource_group(
743 &self,
744 table_id: TableId,
745 resource_group: Option<String>,
746 deferred: bool,
747 ) -> Result<()> {
748 let request = AlterResourceGroupRequest {
749 table_id,
750 resource_group,
751 deferred,
752 };
753
754 self.inner.alter_resource_group(request).await?;
755 Ok(())
756 }
757
758 pub async fn alter_swap_rename(
759 &self,
760 object: alter_swap_rename_request::Object,
761 ) -> Result<WaitVersion> {
762 let request = AlterSwapRenameRequest {
763 object: Some(object),
764 };
765 let resp = self.inner.alter_swap_rename(request).await?;
766 Ok(resp
767 .version
768 .ok_or_else(|| anyhow!("wait version not set"))?)
769 }
770
771 pub async fn alter_secret(
772 &self,
773 secret_id: SecretId,
774 secret_name: String,
775 database_id: DatabaseId,
776 schema_id: SchemaId,
777 owner_id: UserId,
778 value: Vec<u8>,
779 ) -> Result<WaitVersion> {
780 let request = AlterSecretRequest {
781 secret_id,
782 name: secret_name,
783 database_id,
784 schema_id,
785 owner_id,
786 value,
787 };
788 let resp = self.inner.alter_secret(request).await?;
789 Ok(resp
790 .version
791 .ok_or_else(|| anyhow!("wait version not set"))?)
792 }
793
794 pub async fn replace_job(
795 &self,
796 graph: StreamFragmentGraph,
797 replace_job: ReplaceJob,
798 ) -> Result<WaitVersion> {
799 let request = ReplaceJobPlanRequest {
800 plan: Some(ReplaceJobPlan {
801 fragment_graph: Some(graph),
802 replace_job: Some(replace_job),
803 }),
804 };
805 let resp = self.inner.replace_job_plan(request).await?;
806 Ok(resp
808 .version
809 .ok_or_else(|| anyhow!("wait version not set"))?)
810 }
811
812 pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
813 let request = AutoSchemaChangeRequest {
814 schema_change: Some(schema_change),
815 };
816 let _ = self.inner.auto_schema_change(request).await?;
817 Ok(())
818 }
819
820 pub async fn create_view(
821 &self,
822 view: PbView,
823 dependencies: HashSet<ObjectId>,
824 ) -> Result<WaitVersion> {
825 let request = CreateViewRequest {
826 view: Some(view),
827 dependencies: dependencies.into_iter().collect(),
828 };
829 let resp = self.inner.create_view(request).await?;
830 Ok(resp
832 .version
833 .ok_or_else(|| anyhow!("wait version not set"))?)
834 }
835
836 pub async fn create_index(
837 &self,
838 index: PbIndex,
839 table: PbTable,
840 graph: StreamFragmentGraph,
841 if_not_exists: bool,
842 ) -> Result<WaitVersion> {
843 let request = CreateIndexRequest {
844 index: Some(index),
845 index_table: Some(table),
846 fragment_graph: Some(graph),
847 if_not_exists,
848 };
849 let resp = self.inner.create_index(request).await?;
850 Ok(resp
852 .version
853 .ok_or_else(|| anyhow!("wait version not set"))?)
854 }
855
856 pub async fn drop_table(
857 &self,
858 source_id: Option<SourceId>,
859 table_id: TableId,
860 cascade: bool,
861 ) -> Result<WaitVersion> {
862 let request = DropTableRequest {
863 source_id: source_id.map(risingwave_pb::ddl_service::drop_table_request::SourceId::Id),
864 table_id,
865 cascade,
866 };
867
868 let resp = self.inner.drop_table(request).await?;
869 Ok(resp
870 .version
871 .ok_or_else(|| anyhow!("wait version not set"))?)
872 }
873
874 pub async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
875 let request = CompactIcebergTableRequest { sink_id };
876 let resp = self.inner.compact_iceberg_table(request).await?;
877 Ok(resp.task_id)
878 }
879
880 pub async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
881 let request = ExpireIcebergTableSnapshotsRequest { sink_id };
882 let _resp = self.inner.expire_iceberg_table_snapshots(request).await?;
883 Ok(())
884 }
885
886 pub async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<WaitVersion> {
887 let request = DropViewRequest { view_id, cascade };
888 let resp = self.inner.drop_view(request).await?;
889 Ok(resp
890 .version
891 .ok_or_else(|| anyhow!("wait version not set"))?)
892 }
893
894 pub async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<WaitVersion> {
895 let request = DropSourceRequest { source_id, cascade };
896 let resp = self.inner.drop_source(request).await?;
897 Ok(resp
898 .version
899 .ok_or_else(|| anyhow!("wait version not set"))?)
900 }
901
902 pub async fn reset_source(&self, source_id: SourceId) -> Result<WaitVersion> {
903 let request = ResetSourceRequest { source_id };
904 let resp = self.inner.reset_source(request).await?;
905 Ok(resp
906 .version
907 .ok_or_else(|| anyhow!("wait version not set"))?)
908 }
909
910 pub async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<WaitVersion> {
911 let request = DropSinkRequest { sink_id, cascade };
912 let resp = self.inner.drop_sink(request).await?;
913 Ok(resp
914 .version
915 .ok_or_else(|| anyhow!("wait version not set"))?)
916 }
917
918 pub async fn drop_subscription(
919 &self,
920 subscription_id: SubscriptionId,
921 cascade: bool,
922 ) -> Result<WaitVersion> {
923 let request = DropSubscriptionRequest {
924 subscription_id,
925 cascade,
926 };
927 let resp = self.inner.drop_subscription(request).await?;
928 Ok(resp
929 .version
930 .ok_or_else(|| anyhow!("wait version not set"))?)
931 }
932
933 pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
934 let request = DropIndexRequest { index_id, cascade };
935 let resp = self.inner.drop_index(request).await?;
936 Ok(resp
937 .version
938 .ok_or_else(|| anyhow!("wait version not set"))?)
939 }
940
941 pub async fn drop_function(
942 &self,
943 function_id: FunctionId,
944 cascade: bool,
945 ) -> Result<WaitVersion> {
946 let request = DropFunctionRequest {
947 function_id,
948 cascade,
949 };
950 let resp = self.inner.drop_function(request).await?;
951 Ok(resp
952 .version
953 .ok_or_else(|| anyhow!("wait version not set"))?)
954 }
955
956 pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
957 let request = DropDatabaseRequest { database_id };
958 let resp = self.inner.drop_database(request).await?;
959 Ok(resp
960 .version
961 .ok_or_else(|| anyhow!("wait version not set"))?)
962 }
963
964 pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
965 let request = DropSchemaRequest { schema_id, cascade };
966 let resp = self.inner.drop_schema(request).await?;
967 Ok(resp
968 .version
969 .ok_or_else(|| anyhow!("wait version not set"))?)
970 }
971
972 pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
974 let request = CreateUserRequest { user: Some(user) };
975 let resp = self.inner.create_user(request).await?;
976 Ok(resp.version)
977 }
978
979 pub async fn drop_user(&self, user_id: UserId) -> Result<u64> {
980 let request = DropUserRequest { user_id };
981 let resp = self.inner.drop_user(request).await?;
982 Ok(resp.version)
983 }
984
985 pub async fn update_user(
986 &self,
987 user: UserInfo,
988 update_fields: Vec<UpdateField>,
989 ) -> Result<u64> {
990 let request = UpdateUserRequest {
991 user: Some(user),
992 update_fields: update_fields
993 .into_iter()
994 .map(|field| field as i32)
995 .collect::<Vec<_>>(),
996 };
997 let resp = self.inner.update_user(request).await?;
998 Ok(resp.version)
999 }
1000
1001 pub async fn grant_privilege(
1002 &self,
1003 user_ids: Vec<UserId>,
1004 privileges: Vec<GrantPrivilege>,
1005 with_grant_option: bool,
1006 granted_by: UserId,
1007 ) -> Result<u64> {
1008 let request = GrantPrivilegeRequest {
1009 user_ids,
1010 privileges,
1011 with_grant_option,
1012 granted_by,
1013 };
1014 let resp = self.inner.grant_privilege(request).await?;
1015 Ok(resp.version)
1016 }
1017
1018 pub async fn revoke_privilege(
1019 &self,
1020 user_ids: Vec<UserId>,
1021 privileges: Vec<GrantPrivilege>,
1022 granted_by: UserId,
1023 revoke_by: UserId,
1024 revoke_grant_option: bool,
1025 cascade: bool,
1026 ) -> Result<u64> {
1027 let request = RevokePrivilegeRequest {
1028 user_ids,
1029 privileges,
1030 granted_by,
1031 revoke_by,
1032 revoke_grant_option,
1033 cascade,
1034 };
1035 let resp = self.inner.revoke_privilege(request).await?;
1036 Ok(resp.version)
1037 }
1038
1039 pub async fn alter_default_privilege(
1040 &self,
1041 user_ids: Vec<UserId>,
1042 database_id: DatabaseId,
1043 schema_ids: Vec<SchemaId>,
1044 operation: AlterDefaultPrivilegeOperation,
1045 granted_by: UserId,
1046 ) -> Result<()> {
1047 let request = AlterDefaultPrivilegeRequest {
1048 user_ids,
1049 database_id,
1050 schema_ids,
1051 operation: Some(operation),
1052 granted_by,
1053 };
1054 self.inner.alter_default_privilege(request).await?;
1055 Ok(())
1056 }
1057
1058 pub async fn unregister(&self) -> Result<()> {
1060 let request = DeleteWorkerNodeRequest {
1061 host: Some(self.host_addr.to_protobuf()),
1062 };
1063 self.inner.delete_worker_node(request).await?;
1064 self.shutting_down.store(true, Relaxed);
1065 Ok(())
1066 }
1067
1068 pub async fn try_unregister(&self) {
1070 match self.unregister().await {
1071 Ok(_) => {
1072 tracing::info!(
1073 worker_id = %self.worker_id(),
1074 "successfully unregistered from meta service",
1075 )
1076 }
1077 Err(e) => {
1078 tracing::warn!(
1079 error = %e.as_report(),
1080 worker_id = %self.worker_id(),
1081 "failed to unregister from meta service",
1082 );
1083 }
1084 }
1085 }
1086
1087 pub async fn list_worker_nodes(
1088 &self,
1089 worker_type: Option<WorkerType>,
1090 ) -> Result<Vec<WorkerNode>> {
1091 let request = ListAllNodesRequest {
1092 worker_type: worker_type.map(Into::into),
1093 include_starting_nodes: true,
1094 };
1095 let resp = self.inner.list_all_nodes(request).await?;
1096 Ok(resp.nodes)
1097 }
1098
1099 pub fn start_heartbeat_loop(
1101 meta_client: MetaClient,
1102 min_interval: Duration,
1103 ) -> (JoinHandle<()>, Sender<()>) {
1104 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1105 let join_handle = tokio::spawn(async move {
1106 let mut min_interval_ticker = tokio::time::interval(min_interval);
1107 loop {
1108 tokio::select! {
1109 biased;
1110 _ = &mut shutdown_rx => {
1112 tracing::info!("Heartbeat loop is stopped");
1113 return;
1114 }
1115 _ = min_interval_ticker.tick() => {},
1117 }
1118 tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
1119 match tokio::time::timeout(
1120 min_interval * 3,
1122 meta_client.send_heartbeat(),
1123 )
1124 .await
1125 {
1126 Ok(Ok(_)) => {}
1127 Ok(Err(err)) => {
1128 tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
1129 }
1130 Err(_) => {
1131 tracing::warn!("Failed to send_heartbeat: timeout");
1132 }
1133 }
1134 }
1135 });
1136 (join_handle, shutdown_tx)
1137 }
1138
1139 pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
1140 let request = RisectlListStateTablesRequest {};
1141 let resp = self.inner.risectl_list_state_tables(request).await?;
1142 Ok(resp.tables)
1143 }
1144
1145 pub async fn risectl_resume_backfill(
1146 &self,
1147 request: RisectlResumeBackfillRequest,
1148 ) -> Result<()> {
1149 self.inner.risectl_resume_backfill(request).await?;
1150 Ok(())
1151 }
1152
1153 pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
1154 let request = FlushRequest { database_id };
1155 let resp = self.inner.flush(request).await?;
1156 Ok(resp.hummock_version_id)
1157 }
1158
1159 pub async fn wait(&self, job_id: Option<JobId>) -> Result<WaitVersion> {
1160 let request = WaitRequest { job_id };
1161 let resp = self.inner.wait(request).await?;
1162 Ok(resp
1163 .version
1164 .ok_or_else(|| anyhow!("wait version not set"))?)
1165 }
1166
1167 pub async fn recover(&self) -> Result<()> {
1168 let request = RecoverRequest {};
1169 self.inner.recover(request).await?;
1170 Ok(())
1171 }
1172
1173 pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
1174 let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
1175 let resp = self.inner.cancel_creating_jobs(request).await?;
1176 Ok(resp.canceled_jobs)
1177 }
1178
1179 pub async fn list_table_fragments(
1180 &self,
1181 job_ids: &[JobId],
1182 ) -> Result<HashMap<JobId, TableFragmentInfo>> {
1183 let request = ListTableFragmentsRequest {
1184 table_ids: job_ids.to_vec(),
1185 };
1186 let resp = self.inner.list_table_fragments(request).await?;
1187 Ok(resp.table_fragments)
1188 }
1189
1190 pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1191 let resp = self
1192 .inner
1193 .list_streaming_job_states(ListStreamingJobStatesRequest {})
1194 .await?;
1195 Ok(resp.states)
1196 }
1197
1198 pub async fn list_fragment_distributions(
1199 &self,
1200 include_node: bool,
1201 ) -> Result<Vec<FragmentDistribution>> {
1202 let resp = self
1203 .inner
1204 .list_fragment_distribution(ListFragmentDistributionRequest {
1205 include_node: Some(include_node),
1206 })
1207 .await?;
1208 Ok(resp.distributions)
1209 }
1210
1211 pub async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
1212 let resp = self
1213 .inner
1214 .list_creating_fragment_distribution(ListCreatingFragmentDistributionRequest {
1215 include_node: Some(true),
1216 })
1217 .await?;
1218 Ok(resp.distributions)
1219 }
1220
1221 pub async fn get_fragment_by_id(
1222 &self,
1223 fragment_id: FragmentId,
1224 ) -> Result<Option<FragmentDistribution>> {
1225 let resp = self
1226 .inner
1227 .get_fragment_by_id(GetFragmentByIdRequest { fragment_id })
1228 .await?;
1229 Ok(resp.distribution)
1230 }
1231
1232 pub async fn get_fragment_vnodes(
1233 &self,
1234 fragment_id: FragmentId,
1235 ) -> Result<Vec<(ActorId, Vec<u32>)>> {
1236 let resp = self
1237 .inner
1238 .get_fragment_vnodes(GetFragmentVnodesRequest { fragment_id })
1239 .await?;
1240 Ok(resp
1241 .actor_vnodes
1242 .into_iter()
1243 .map(|actor| (actor.actor_id, actor.vnode_indices))
1244 .collect())
1245 }
1246
1247 pub async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>> {
1248 let resp = self
1249 .inner
1250 .get_actor_vnodes(GetActorVnodesRequest { actor_id })
1251 .await?;
1252 Ok(resp.vnode_indices)
1253 }
1254
1255 pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1256 let resp = self
1257 .inner
1258 .list_actor_states(ListActorStatesRequest {})
1259 .await?;
1260 Ok(resp.states)
1261 }
1262
1263 pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1264 let resp = self
1265 .inner
1266 .list_actor_splits(ListActorSplitsRequest {})
1267 .await?;
1268
1269 Ok(resp.actor_splits)
1270 }
1271
1272 pub async fn pause(&self) -> Result<PauseResponse> {
1273 let request = PauseRequest {};
1274 let resp = self.inner.pause(request).await?;
1275 Ok(resp)
1276 }
1277
1278 pub async fn resume(&self) -> Result<ResumeResponse> {
1279 let request = ResumeRequest {};
1280 let resp = self.inner.resume(request).await?;
1281 Ok(resp)
1282 }
1283
1284 pub async fn apply_throttle(
1285 &self,
1286 throttle_target: PbThrottleTarget,
1287 throttle_type: risingwave_pb::common::PbThrottleType,
1288 id: u32,
1289 rate: Option<u32>,
1290 ) -> Result<ApplyThrottleResponse> {
1291 let request = ApplyThrottleRequest {
1292 throttle_target: throttle_target as i32,
1293 throttle_type: throttle_type as i32,
1294 id,
1295 rate,
1296 };
1297 let resp = self.inner.apply_throttle(request).await?;
1298 Ok(resp)
1299 }
1300
1301 pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1302 let resp = self
1303 .inner
1304 .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1305 .await?;
1306 Ok(resp.get_status().unwrap())
1307 }
1308
1309 pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1310 let request = GetClusterInfoRequest {};
1311 let resp = self.inner.get_cluster_info(request).await?;
1312 Ok(resp)
1313 }
1314
1315 pub async fn reschedule(
1316 &self,
1317 worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1318 revision: u64,
1319 resolve_no_shuffle_upstream: bool,
1320 ) -> Result<(bool, u64)> {
1321 let request = RescheduleRequest {
1322 revision,
1323 resolve_no_shuffle_upstream,
1324 worker_reschedules,
1325 };
1326 let resp = self.inner.reschedule(request).await?;
1327 Ok((resp.success, resp.revision))
1328 }
1329
1330 pub async fn risectl_get_pinned_versions_summary(
1331 &self,
1332 ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1333 let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1334 self.inner
1335 .rise_ctl_get_pinned_versions_summary(request)
1336 .await
1337 }
1338
1339 pub async fn risectl_get_checkpoint_hummock_version(
1340 &self,
1341 ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1342 let request = RiseCtlGetCheckpointVersionRequest {};
1343 self.inner.rise_ctl_get_checkpoint_version(request).await
1344 }
1345
1346 pub async fn risectl_pause_hummock_version_checkpoint(
1347 &self,
1348 ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1349 let request = RiseCtlPauseVersionCheckpointRequest {};
1350 self.inner.rise_ctl_pause_version_checkpoint(request).await
1351 }
1352
1353 pub async fn risectl_resume_hummock_version_checkpoint(
1354 &self,
1355 ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1356 let request = RiseCtlResumeVersionCheckpointRequest {};
1357 self.inner.rise_ctl_resume_version_checkpoint(request).await
1358 }
1359
1360 pub async fn init_metadata_for_replay(
1361 &self,
1362 tables: Vec<PbTable>,
1363 compaction_groups: Vec<CompactionGroupInfo>,
1364 ) -> Result<()> {
1365 let req = InitMetadataForReplayRequest {
1366 tables,
1367 compaction_groups,
1368 };
1369 let _resp = self.inner.init_metadata_for_replay(req).await?;
1370 Ok(())
1371 }
1372
1373 pub async fn replay_version_delta(
1374 &self,
1375 version_delta: HummockVersionDelta,
1376 ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1377 let req = ReplayVersionDeltaRequest {
1378 version_delta: Some(version_delta.into()),
1379 };
1380 let resp = self.inner.replay_version_delta(req).await?;
1381 Ok((
1382 HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1383 resp.modified_compaction_groups,
1384 ))
1385 }
1386
1387 pub async fn list_version_deltas(
1388 &self,
1389 start_id: HummockVersionId,
1390 num_limit: u32,
1391 committed_epoch_limit: HummockEpoch,
1392 ) -> Result<Vec<HummockVersionDelta>> {
1393 let req = ListVersionDeltasRequest {
1394 start_id,
1395 num_limit,
1396 committed_epoch_limit,
1397 };
1398 Ok(self
1399 .inner
1400 .list_version_deltas(req)
1401 .await?
1402 .version_deltas
1403 .unwrap()
1404 .version_deltas
1405 .iter()
1406 .map(HummockVersionDelta::from_rpc_protobuf)
1407 .collect())
1408 }
1409
1410 pub async fn trigger_compaction_deterministic(
1411 &self,
1412 version_id: HummockVersionId,
1413 compaction_groups: Vec<CompactionGroupId>,
1414 ) -> Result<()> {
1415 let req = TriggerCompactionDeterministicRequest {
1416 version_id,
1417 compaction_groups,
1418 };
1419 self.inner.trigger_compaction_deterministic(req).await?;
1420 Ok(())
1421 }
1422
1423 pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1424 let req = DisableCommitEpochRequest {};
1425 Ok(HummockVersion::from_rpc_protobuf(
1426 &self
1427 .inner
1428 .disable_commit_epoch(req)
1429 .await?
1430 .current_version
1431 .unwrap(),
1432 ))
1433 }
1434
1435 pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1436 let req = GetAssignedCompactTaskNumRequest {};
1437 let resp = self.inner.get_assigned_compact_task_num(req).await?;
1438 Ok(resp.num_tasks as usize)
1439 }
1440
1441 pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1442 let req = RiseCtlListCompactionGroupRequest {};
1443 let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1444 Ok(resp.compaction_groups)
1445 }
1446
1447 pub async fn risectl_update_compaction_config(
1448 &self,
1449 compaction_groups: &[CompactionGroupId],
1450 configs: &[MutableConfig],
1451 ) -> Result<()> {
1452 let req = RiseCtlUpdateCompactionConfigRequest {
1453 compaction_group_ids: compaction_groups.to_vec(),
1454 configs: configs
1455 .iter()
1456 .map(
1457 |c| rise_ctl_update_compaction_config_request::MutableConfig {
1458 mutable_config: Some(c.clone()),
1459 },
1460 )
1461 .collect(),
1462 };
1463 let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1464 Ok(())
1465 }
1466
1467 pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1468 let req = BackupMetaRequest { remarks };
1469 let resp = self.inner.backup_meta(req).await?;
1470 Ok(resp.job_id)
1471 }
1472
1473 pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1474 let req = GetBackupJobStatusRequest { job_id };
1475 let resp = self.inner.get_backup_job_status(req).await?;
1476 Ok((resp.job_status(), resp.message))
1477 }
1478
1479 pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1480 let req = DeleteMetaSnapshotRequest {
1481 snapshot_ids: snapshot_ids.to_vec(),
1482 };
1483 let _resp = self.inner.delete_meta_snapshot(req).await?;
1484 Ok(())
1485 }
1486
1487 pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1488 let req = GetMetaSnapshotManifestRequest {};
1489 let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1490 Ok(resp.manifest.expect("should exist"))
1491 }
1492
1493 pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1494 let req = GetTelemetryInfoRequest {};
1495 let resp = self.inner.get_telemetry_info(req).await?;
1496 Ok(resp)
1497 }
1498
1499 pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1500 let req = GetMetaStoreInfoRequest {};
1501 let resp = self.inner.get_meta_store_info(req).await?;
1502 Ok(resp.meta_store_endpoint)
1503 }
1504
1505 pub async fn alter_sink_props(
1506 &self,
1507 sink_id: SinkId,
1508 changed_props: BTreeMap<String, String>,
1509 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1510 connector_conn_ref: Option<ConnectionId>,
1511 ) -> Result<()> {
1512 let req = AlterConnectorPropsRequest {
1513 object_id: sink_id.as_raw_id(),
1514 changed_props: changed_props.into_iter().collect(),
1515 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1516 connector_conn_ref,
1517 object_type: AlterConnectorPropsObject::Sink as i32,
1518 extra_options: None,
1519 };
1520 let _resp = self.inner.alter_connector_props(req).await?;
1521 Ok(())
1522 }
1523
1524 pub async fn alter_iceberg_table_props(
1525 &self,
1526 table_id: TableId,
1527 sink_id: SinkId,
1528 source_id: SourceId,
1529 changed_props: BTreeMap<String, String>,
1530 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1531 connector_conn_ref: Option<ConnectionId>,
1532 ) -> Result<()> {
1533 let req = AlterConnectorPropsRequest {
1534 object_id: table_id.as_raw_id(),
1535 changed_props: changed_props.into_iter().collect(),
1536 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1537 connector_conn_ref,
1538 object_type: AlterConnectorPropsObject::IcebergTable as i32,
1539 extra_options: Some(ExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds {
1540 sink_id,
1541 source_id,
1542 })),
1543 };
1544 let _resp = self.inner.alter_connector_props(req).await?;
1545 Ok(())
1546 }
1547
1548 pub async fn alter_source_connector_props(
1549 &self,
1550 source_id: SourceId,
1551 changed_props: BTreeMap<String, String>,
1552 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1553 connector_conn_ref: Option<ConnectionId>,
1554 ) -> Result<()> {
1555 let req = AlterConnectorPropsRequest {
1556 object_id: source_id.as_raw_id(),
1557 changed_props: changed_props.into_iter().collect(),
1558 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1559 connector_conn_ref,
1560 object_type: AlterConnectorPropsObject::Source as i32,
1561 extra_options: None,
1562 };
1563 let _resp = self.inner.alter_connector_props(req).await?;
1564 Ok(())
1565 }
1566
1567 pub async fn alter_connection_connector_props(
1568 &self,
1569 connection_id: u32,
1570 changed_props: BTreeMap<String, String>,
1571 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1572 ) -> Result<()> {
1573 let req = AlterConnectorPropsRequest {
1574 object_id: connection_id,
1575 changed_props: changed_props.into_iter().collect(),
1576 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1577 connector_conn_ref: None, object_type: AlterConnectorPropsObject::Connection as i32,
1579 extra_options: None,
1580 };
1581 let _resp = self.inner.alter_connector_props(req).await?;
1582 Ok(())
1583 }
1584
1585 pub async fn alter_source_properties_safe(
1588 &self,
1589 source_id: SourceId,
1590 changed_props: BTreeMap<String, String>,
1591 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1592 reset_splits: bool,
1593 ) -> Result<()> {
1594 let req = AlterSourcePropertiesSafeRequest {
1595 source_id: source_id.as_raw_id(),
1596 changed_props: changed_props.into_iter().collect(),
1597 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1598 options: Some(PropertyUpdateOptions { reset_splits }),
1599 };
1600 let _resp = self.inner.alter_source_properties_safe(req).await?;
1601 Ok(())
1602 }
1603
1604 pub async fn reset_source_splits(&self, source_id: SourceId) -> Result<()> {
1607 let req = ResetSourceSplitsRequest {
1608 source_id: source_id.as_raw_id(),
1609 };
1610 let _resp = self.inner.reset_source_splits(req).await?;
1611 Ok(())
1612 }
1613
1614 pub async fn inject_source_offsets(
1617 &self,
1618 source_id: SourceId,
1619 split_offsets: HashMap<String, String>,
1620 ) -> Result<Vec<String>> {
1621 let req = InjectSourceOffsetsRequest {
1622 source_id: source_id.as_raw_id(),
1623 split_offsets,
1624 };
1625 let resp = self.inner.inject_source_offsets(req).await?;
1626 Ok(resp.applied_split_ids)
1627 }
1628
1629 pub async fn set_system_param(
1630 &self,
1631 param: String,
1632 value: Option<String>,
1633 ) -> Result<Option<SystemParamsReader>> {
1634 let req = SetSystemParamRequest { param, value };
1635 let resp = self.inner.set_system_param(req).await?;
1636 Ok(resp.params.map(SystemParamsReader::from))
1637 }
1638
1639 pub async fn get_session_params(&self) -> Result<String> {
1640 let req = GetSessionParamsRequest {};
1641 let resp = self.inner.get_session_params(req).await?;
1642 Ok(resp.params)
1643 }
1644
1645 pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1646 let req = SetSessionParamRequest { param, value };
1647 let resp = self.inner.set_session_param(req).await?;
1648 Ok(resp.param)
1649 }
1650
1651 pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1652 let req = GetDdlProgressRequest {};
1653 let resp = self.inner.get_ddl_progress(req).await?;
1654 Ok(resp.ddl_progress)
1655 }
1656
1657 pub async fn split_compaction_group(
1658 &self,
1659 group_id: CompactionGroupId,
1660 table_ids_to_new_group: &[TableId],
1661 partition_vnode_count: u32,
1662 ) -> Result<CompactionGroupId> {
1663 let req = SplitCompactionGroupRequest {
1664 group_id,
1665 table_ids: table_ids_to_new_group.to_vec(),
1666 partition_vnode_count,
1667 };
1668 let resp = self.inner.split_compaction_group(req).await?;
1669 Ok(resp.new_group_id)
1670 }
1671
1672 pub async fn get_tables(
1673 &self,
1674 table_ids: Vec<TableId>,
1675 include_dropped_tables: bool,
1676 ) -> Result<HashMap<TableId, Table>> {
1677 let req = GetTablesRequest {
1678 table_ids,
1679 include_dropped_tables,
1680 };
1681 let resp = self.inner.get_tables(req).await?;
1682 Ok(resp.tables)
1683 }
1684
1685 pub async fn list_serving_vnode_mappings(
1686 &self,
1687 ) -> Result<HashMap<FragmentId, (JobId, WorkerSlotMapping)>> {
1688 let req = GetServingVnodeMappingsRequest {};
1689 let resp = self.inner.get_serving_vnode_mappings(req).await?;
1690 let mappings = resp
1691 .worker_slot_mappings
1692 .into_iter()
1693 .map(|p| {
1694 (
1695 p.fragment_id,
1696 (
1697 resp.fragment_to_table
1698 .get(&p.fragment_id)
1699 .cloned()
1700 .unwrap_or_default(),
1701 WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1702 ),
1703 )
1704 })
1705 .collect();
1706 Ok(mappings)
1707 }
1708
1709 pub async fn risectl_list_compaction_status(
1710 &self,
1711 ) -> Result<(
1712 Vec<CompactStatus>,
1713 Vec<CompactTaskAssignment>,
1714 Vec<CompactTaskProgress>,
1715 )> {
1716 let req = RiseCtlListCompactionStatusRequest {};
1717 let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1718 Ok((
1719 resp.compaction_statuses,
1720 resp.task_assignment,
1721 resp.task_progress,
1722 ))
1723 }
1724
1725 pub async fn get_compaction_score(
1726 &self,
1727 compaction_group_id: CompactionGroupId,
1728 ) -> Result<Vec<PickerInfo>> {
1729 let req = GetCompactionScoreRequest {
1730 compaction_group_id,
1731 };
1732 let resp = self.inner.get_compaction_score(req).await?;
1733 Ok(resp.scores)
1734 }
1735
1736 pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1737 let req = RiseCtlRebuildTableStatsRequest {};
1738 let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1739 Ok(())
1740 }
1741
1742 pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1743 let req = ListBranchedObjectRequest {};
1744 let resp = self.inner.list_branched_object(req).await?;
1745 Ok(resp.branched_objects)
1746 }
1747
1748 pub async fn list_active_write_limit(&self) -> Result<HashMap<CompactionGroupId, WriteLimit>> {
1749 let req = ListActiveWriteLimitRequest {};
1750 let resp = self.inner.list_active_write_limit(req).await?;
1751 Ok(resp.write_limits)
1752 }
1753
1754 pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1755 let req = ListHummockMetaConfigRequest {};
1756 let resp = self.inner.list_hummock_meta_config(req).await?;
1757 Ok(resp.configs)
1758 }
1759
1760 pub async fn get_table_change_logs(
1761 &self,
1762 epoch_only: bool,
1763 start_epoch_inclusive: Option<u64>,
1764 end_epoch_inclusive: Option<u64>,
1765 table_ids: Option<HashSet<TableId>>,
1766 exclude_empty: bool,
1767 limit: Option<u32>,
1768 ) -> Result<TableChangeLogs> {
1769 let req = GetTableChangeLogsRequest {
1770 epoch_only,
1771 start_epoch_inclusive,
1772 end_epoch_inclusive,
1773 table_ids: table_ids.map(|iter| PbTableFilter {
1774 table_ids: iter.into_iter().collect::<Vec<_>>(),
1775 }),
1776 exclude_empty,
1777 limit,
1778 };
1779 let resp = self.inner.get_table_change_logs(req).await?;
1780 Ok(resp
1781 .table_change_logs
1782 .into_iter()
1783 .map(|(id, change_log)| (TableId::new(id), TableChangeLog::from_protobuf(&change_log)))
1784 .collect())
1785 }
1786
1787 pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1788 let _resp = self
1789 .inner
1790 .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1791 .await?;
1792
1793 Ok(())
1794 }
1795
1796 pub async fn rw_cloud_validate_source(
1797 &self,
1798 source_type: SourceType,
1799 source_config: HashMap<String, String>,
1800 ) -> Result<RwCloudValidateSourceResponse> {
1801 let req = RwCloudValidateSourceRequest {
1802 source_type: source_type.into(),
1803 source_config,
1804 };
1805 let resp = self.inner.rw_cloud_validate_source(req).await?;
1806 Ok(resp)
1807 }
1808
1809 pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1810 self.inner.core.read().await.sink_coordinate_client.clone()
1811 }
1812
1813 pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1814 let req = ListCompactTaskAssignmentRequest {};
1815 let resp = self.inner.list_compact_task_assignment(req).await?;
1816 Ok(resp.task_assignment)
1817 }
1818
1819 pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1820 let req = ListEventLogRequest::default();
1821 let resp = self.inner.list_event_log(req).await?;
1822 Ok(resp.event_logs)
1823 }
1824
1825 pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1826 let req = ListCompactTaskProgressRequest {};
1827 let resp = self.inner.list_compact_task_progress(req).await?;
1828 Ok(resp.task_progress)
1829 }
1830
1831 #[cfg(madsim)]
1832 pub fn try_add_panic_event_blocking(
1833 &self,
1834 panic_info: impl Display,
1835 timeout_millis: Option<u64>,
1836 ) {
1837 }
1838
1839 #[cfg(not(madsim))]
1841 pub fn try_add_panic_event_blocking(
1842 &self,
1843 panic_info: impl Display,
1844 timeout_millis: Option<u64>,
1845 ) {
1846 let event = event_log::EventWorkerNodePanic {
1847 worker_id: self.worker_id,
1848 worker_type: self.worker_type.into(),
1849 host_addr: Some(self.host_addr.to_protobuf()),
1850 panic_info: format!("{panic_info}"),
1851 };
1852 let grpc_meta_client = self.inner.clone();
1853 let _ = thread::spawn(move || {
1854 let rt = tokio::runtime::Builder::new_current_thread()
1855 .enable_all()
1856 .build()
1857 .unwrap();
1858 let req = AddEventLogRequest {
1859 event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1860 };
1861 rt.block_on(async {
1862 let _ = tokio::time::timeout(
1863 Duration::from_millis(timeout_millis.unwrap_or(1000)),
1864 grpc_meta_client.add_event_log(req),
1865 )
1866 .await;
1867 });
1868 })
1869 .join();
1870 }
1871
1872 pub async fn add_sink_fail_evet(
1873 &self,
1874 sink_id: SinkId,
1875 sink_name: String,
1876 connector: String,
1877 error: String,
1878 ) -> Result<()> {
1879 let event = event_log::EventSinkFail {
1880 sink_id,
1881 sink_name,
1882 connector,
1883 error,
1884 };
1885 let req = AddEventLogRequest {
1886 event: Some(add_event_log_request::Event::SinkFail(event)),
1887 };
1888 self.inner.add_event_log(req).await?;
1889 Ok(())
1890 }
1891
1892 pub async fn add_cdc_auto_schema_change_fail_event(
1893 &self,
1894 source_id: SourceId,
1895 table_name: String,
1896 cdc_table_id: String,
1897 upstream_ddl: String,
1898 fail_info: String,
1899 ) -> Result<()> {
1900 let event = event_log::EventAutoSchemaChangeFail {
1901 table_id: source_id.as_cdc_table_id(),
1902 table_name,
1903 cdc_table_id,
1904 upstream_ddl,
1905 fail_info,
1906 };
1907 let req = AddEventLogRequest {
1908 event: Some(add_event_log_request::Event::AutoSchemaChangeFail(event)),
1909 };
1910 self.inner.add_event_log(req).await?;
1911 Ok(())
1912 }
1913
1914 pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1915 let req = CancelCompactTaskRequest {
1916 task_id,
1917 task_status: task_status as _,
1918 };
1919 let resp = self.inner.cancel_compact_task(req).await?;
1920 Ok(resp.ret)
1921 }
1922
1923 pub async fn get_version_by_epoch(
1924 &self,
1925 epoch: HummockEpoch,
1926 table_id: TableId,
1927 ) -> Result<PbHummockVersion> {
1928 let req = GetVersionByEpochRequest { epoch, table_id };
1929 let resp = self.inner.get_version_by_epoch(req).await?;
1930 Ok(resp.version.unwrap())
1931 }
1932
1933 pub async fn get_cluster_limits(
1934 &self,
1935 ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1936 let req = GetClusterLimitsRequest {};
1937 let resp = self.inner.get_cluster_limits(req).await?;
1938 Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1939 }
1940
1941 pub async fn merge_compaction_group(
1942 &self,
1943 left_group_id: CompactionGroupId,
1944 right_group_id: CompactionGroupId,
1945 ) -> Result<()> {
1946 let req = MergeCompactionGroupRequest {
1947 left_group_id,
1948 right_group_id,
1949 };
1950 self.inner.merge_compaction_group(req).await?;
1951 Ok(())
1952 }
1953
1954 pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1956 let request = ListRateLimitsRequest {};
1957 let resp = self.inner.list_rate_limits(request).await?;
1958 Ok(resp.rate_limits)
1959 }
1960
1961 pub async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
1962 let request = ListCdcProgressRequest {};
1963 let resp = self.inner.list_cdc_progress(request).await?;
1964 Ok(resp.cdc_progress)
1965 }
1966
1967 pub async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>> {
1968 let request = ListRefreshTableStatesRequest {};
1969 let resp = self.inner.list_refresh_table_states(request).await?;
1970 Ok(resp.states)
1971 }
1972
1973 pub async fn list_iceberg_compaction_status(
1974 &self,
1975 ) -> Result<Vec<list_iceberg_compaction_status_response::IcebergCompactionStatus>> {
1976 let request = ListIcebergCompactionStatusRequest {};
1977 let resp = self.inner.list_iceberg_compaction_status(request).await?;
1978 Ok(resp.statuses)
1979 }
1980
1981 pub async fn list_sink_log_store_tables(
1982 &self,
1983 ) -> Result<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
1984 let request = ListSinkLogStoreTablesRequest {};
1985 let resp = self.inner.list_sink_log_store_tables(request).await?;
1986 Ok(resp.tables)
1987 }
1988
1989 pub async fn create_iceberg_table(
1990 &self,
1991 table_job_info: PbTableJobInfo,
1992 sink_job_info: PbSinkJobInfo,
1993 iceberg_source: PbSource,
1994 if_not_exists: bool,
1995 ) -> Result<WaitVersion> {
1996 let request = CreateIcebergTableRequest {
1997 table_info: Some(table_job_info),
1998 sink_info: Some(sink_job_info),
1999 iceberg_source: Some(iceberg_source),
2000 if_not_exists,
2001 };
2002
2003 let resp = Box::pin(self.inner.create_iceberg_table(request)).await?;
2004 Ok(resp
2005 .version
2006 .ok_or_else(|| anyhow!("wait version not set"))?)
2007 }
2008
2009 pub async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
2010 let request = ListIcebergTablesRequest {};
2011 let resp = self.inner.list_iceberg_tables(request).await?;
2012 Ok(resp.iceberg_tables)
2013 }
2014
2015 pub async fn get_cluster_stack_trace(
2017 &self,
2018 actor_traces_format: ActorTracesFormat,
2019 ) -> Result<StackTraceResponse> {
2020 let request = StackTraceRequest {
2021 actor_traces_format: actor_traces_format as i32,
2022 };
2023 let resp = self.inner.stack_trace(request).await?;
2024 Ok(resp)
2025 }
2026
2027 pub async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
2028 let request = SetSyncLogStoreAlignedRequest { job_id, aligned };
2029 self.inner.set_sync_log_store_aligned(request).await?;
2030 Ok(())
2031 }
2032
2033 pub async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
2034 self.inner.refresh(request).await
2035 }
2036
2037 pub async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
2038 let request = ListUnmigratedTablesRequest {};
2039 let resp = self.inner.list_unmigrated_tables(request).await?;
2040 Ok(resp
2041 .tables
2042 .into_iter()
2043 .map(|table| (table.table_id, table.table_name))
2044 .collect())
2045 }
2046}
2047
2048#[async_trait]
2049impl HummockMetaClient for MetaClient {
2050 async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
2051 let req = UnpinVersionBeforeRequest {
2052 context_id: self.worker_id(),
2053 unpin_version_before,
2054 };
2055 self.inner.unpin_version_before(req).await?;
2056 Ok(())
2057 }
2058
2059 async fn get_current_version(&self) -> Result<HummockVersion> {
2060 let req = GetCurrentVersionRequest::default();
2061 Ok(HummockVersion::from_rpc_protobuf(
2062 &self
2063 .inner
2064 .get_current_version(req)
2065 .await?
2066 .current_version
2067 .unwrap(),
2068 ))
2069 }
2070
2071 async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
2072 let resp = self
2073 .inner
2074 .get_new_object_ids(GetNewObjectIdsRequest { number })
2075 .await?;
2076 Ok(ObjectIdRange::new(resp.start_id, resp.end_id))
2077 }
2078
2079 async fn commit_epoch_with_change_log(
2080 &self,
2081 _epoch: HummockEpoch,
2082 _sync_result: SyncResult,
2083 _change_log_info: Option<HummockMetaClientChangeLogInfo>,
2084 ) -> Result<()> {
2085 panic!("Only meta service can commit_epoch in production.")
2086 }
2087
2088 async fn trigger_manual_compaction(
2089 &self,
2090 compaction_group_id: CompactionGroupId,
2091 table_id: JobId,
2092 level: u32,
2093 target_level: Option<u32>,
2094 sst_ids: Vec<HummockSstableId>,
2095 exclusive: bool,
2096 ) -> Result<bool> {
2097 let req = TriggerManualCompactionRequest {
2099 compaction_group_id,
2100 table_id,
2101 level,
2104 target_level,
2105 sst_ids,
2106 exclusive: Some(exclusive),
2107 ..Default::default()
2108 };
2109
2110 let resp = self.inner.trigger_manual_compaction(req).await?;
2111 Ok(resp.should_retry.unwrap_or(false))
2112 }
2113
2114 async fn trigger_full_gc(
2115 &self,
2116 sst_retention_time_sec: u64,
2117 prefix: Option<String>,
2118 ) -> Result<()> {
2119 self.inner
2120 .trigger_full_gc(TriggerFullGcRequest {
2121 sst_retention_time_sec,
2122 prefix,
2123 })
2124 .await?;
2125 Ok(())
2126 }
2127
2128 async fn subscribe_compaction_event(
2129 &self,
2130 ) -> Result<(
2131 UnboundedSender<SubscribeCompactionEventRequest>,
2132 BoxStream<'static, CompactionEventItem>,
2133 )> {
2134 let (request_sender, request_receiver) =
2135 unbounded_channel::<SubscribeCompactionEventRequest>();
2136 request_sender
2137 .send(SubscribeCompactionEventRequest {
2138 event: Some(subscribe_compaction_event_request::Event::Register(
2139 Register {
2140 context_id: self.worker_id(),
2141 },
2142 )),
2143 create_at: SystemTime::now()
2144 .duration_since(SystemTime::UNIX_EPOCH)
2145 .expect("Clock may have gone backwards")
2146 .as_millis() as u64,
2147 })
2148 .context("Failed to subscribe compaction event")?;
2149
2150 let stream = self
2151 .inner
2152 .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
2153 request_receiver,
2154 )))
2155 .await?;
2156
2157 Ok((request_sender, Box::pin(stream)))
2158 }
2159
2160 async fn get_version_by_epoch(
2161 &self,
2162 epoch: HummockEpoch,
2163 table_id: TableId,
2164 ) -> Result<PbHummockVersion> {
2165 self.get_version_by_epoch(epoch, table_id).await
2166 }
2167
2168 async fn subscribe_iceberg_compaction_event(
2169 &self,
2170 ) -> Result<(
2171 UnboundedSender<SubscribeIcebergCompactionEventRequest>,
2172 BoxStream<'static, IcebergCompactionEventItem>,
2173 )> {
2174 let (request_sender, request_receiver) =
2175 unbounded_channel::<SubscribeIcebergCompactionEventRequest>();
2176 request_sender
2177 .send(SubscribeIcebergCompactionEventRequest {
2178 event: Some(subscribe_iceberg_compaction_event_request::Event::Register(
2179 IcebergRegister {
2180 context_id: self.worker_id(),
2181 },
2182 )),
2183 create_at: SystemTime::now()
2184 .duration_since(SystemTime::UNIX_EPOCH)
2185 .expect("Clock may have gone backwards")
2186 .as_millis() as u64,
2187 })
2188 .context("Failed to subscribe compaction event")?;
2189
2190 let stream = self
2191 .inner
2192 .subscribe_iceberg_compaction_event(Request::new(UnboundedReceiverStream::new(
2193 request_receiver,
2194 )))
2195 .await?;
2196
2197 Ok((request_sender, Box::pin(stream)))
2198 }
2199
2200 async fn get_table_change_logs(
2201 &self,
2202 epoch_only: bool,
2203 start_epoch_inclusive: Option<u64>,
2204 end_epoch_inclusive: Option<u64>,
2205 table_ids: Option<HashSet<TableId>>,
2206 exclude_empty: bool,
2207 limit: Option<u32>,
2208 ) -> Result<TableChangeLogs> {
2209 self.get_table_change_logs(
2210 epoch_only,
2211 start_epoch_inclusive,
2212 end_epoch_inclusive,
2213 table_ids,
2214 exclude_empty,
2215 limit,
2216 )
2217 .await
2218 }
2219}
2220
2221#[async_trait]
2222impl TelemetryInfoFetcher for MetaClient {
2223 async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
2224 let resp = self
2225 .get_telemetry_info()
2226 .await
2227 .map_err(|e| e.to_report_string())?;
2228 let tracking_id = resp.get_tracking_id().ok();
2229 Ok(tracking_id.map(|id| id.to_owned()))
2230 }
2231}
2232
2233pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
2234
2235#[derive(Debug, Clone)]
2236struct GrpcMetaClientCore {
2237 cluster_client: ClusterServiceClient<Channel>,
2238 meta_member_client: MetaMemberServiceClient<Channel>,
2239 heartbeat_client: HeartbeatServiceClient<Channel>,
2240 ddl_client: DdlServiceClient<Channel>,
2241 hummock_client: HummockManagerServiceClient<Channel>,
2242 notification_client: NotificationServiceClient<Channel>,
2243 stream_client: StreamManagerServiceClient<Channel>,
2244 user_client: UserServiceClient<Channel>,
2245 scale_client: ScaleServiceClient<Channel>,
2246 backup_client: BackupServiceClient<Channel>,
2247 telemetry_client: TelemetryInfoServiceClient<Channel>,
2248 system_params_client: SystemParamsServiceClient<Channel>,
2249 session_params_client: SessionParamServiceClient<Channel>,
2250 serving_client: ServingServiceClient<Channel>,
2251 cloud_client: CloudServiceClient<Channel>,
2252 sink_coordinate_client: SinkCoordinationRpcClient,
2253 event_log_client: EventLogServiceClient<Channel>,
2254 cluster_limit_client: ClusterLimitServiceClient<Channel>,
2255 hosted_iceberg_catalog_service_client: HostedIcebergCatalogServiceClient<Channel>,
2256 monitor_client: MonitorServiceClient<Channel>,
2257}
2258
2259impl GrpcMetaClientCore {
2260 pub(crate) fn new(channel: Channel) -> Self {
2261 let cluster_client = ClusterServiceClient::new(channel.clone());
2262 let meta_member_client = MetaMemberClient::new(channel.clone());
2263 let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
2264 let ddl_client =
2265 DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2266 let hummock_client =
2267 HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2268 let notification_client =
2269 NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2270 let stream_client =
2271 StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2272 let user_client = UserServiceClient::new(channel.clone());
2273 let scale_client =
2274 ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2275 let backup_client = BackupServiceClient::new(channel.clone());
2276 let telemetry_client =
2277 TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2278 let system_params_client = SystemParamsServiceClient::new(channel.clone());
2279 let session_params_client = SessionParamServiceClient::new(channel.clone());
2280 let serving_client = ServingServiceClient::new(channel.clone());
2281 let cloud_client = CloudServiceClient::new(channel.clone());
2282 let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone())
2283 .max_decoding_message_size(usize::MAX);
2284 let event_log_client = EventLogServiceClient::new(channel.clone());
2285 let cluster_limit_client = ClusterLimitServiceClient::new(channel.clone());
2286 let hosted_iceberg_catalog_service_client =
2287 HostedIcebergCatalogServiceClient::new(channel.clone());
2288 let monitor_client = configured_monitor_service_client(MonitorServiceClient::new(channel));
2289
2290 GrpcMetaClientCore {
2291 cluster_client,
2292 meta_member_client,
2293 heartbeat_client,
2294 ddl_client,
2295 hummock_client,
2296 notification_client,
2297 stream_client,
2298 user_client,
2299 scale_client,
2300 backup_client,
2301 telemetry_client,
2302 system_params_client,
2303 session_params_client,
2304 serving_client,
2305 cloud_client,
2306 sink_coordinate_client,
2307 event_log_client,
2308 cluster_limit_client,
2309 hosted_iceberg_catalog_service_client,
2310 monitor_client,
2311 }
2312 }
2313}
2314
2315#[derive(Debug, Clone)]
2319struct GrpcMetaClient {
2320 member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
2321 core: Arc<RwLock<GrpcMetaClientCore>>,
2322}
2323
2324type MetaMemberClient = MetaMemberServiceClient<Channel>;
2325
2326struct MetaMemberGroup {
2327 members: LruCache<http::Uri, Option<MetaMemberClient>>,
2328}
2329
2330struct MetaMemberManagement {
2331 core_ref: Arc<RwLock<GrpcMetaClientCore>>,
2332 members: Either<MetaMemberClient, MetaMemberGroup>,
2333 current_leader: http::Uri,
2334 meta_config: Arc<MetaConfig>,
2335}
2336
2337impl MetaMemberManagement {
2338 const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
2339
2340 fn host_address_to_uri(addr: HostAddress) -> http::Uri {
2341 format!("http://{}:{}", addr.host, addr.port)
2342 .parse()
2343 .unwrap()
2344 }
2345
2346 async fn recreate_core(&self, channel: Channel) {
2347 let mut core = self.core_ref.write().await;
2348 *core = GrpcMetaClientCore::new(channel);
2349 }
2350
2351 async fn refresh_members(&mut self) -> Result<()> {
2352 let leader_addr = match self.members.as_mut() {
2353 Either::Left(client) => {
2354 let resp = client
2355 .to_owned()
2356 .members(MembersRequest {})
2357 .await
2358 .map_err(RpcError::from_meta_status)?;
2359 let resp = resp.into_inner();
2360 resp.members.into_iter().find(|member| member.is_leader)
2361 }
2362 Either::Right(member_group) => {
2363 let mut fetched_members = None;
2364
2365 for (addr, client) in &mut member_group.members {
2366 let members: Result<_> = try {
2367 let mut client = match client {
2368 Some(cached_client) => cached_client.to_owned(),
2369 None => {
2370 let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
2371 let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
2372 .await
2373 .context("failed to create client")?;
2374 let new_client: MetaMemberClient =
2375 MetaMemberServiceClient::new(channel);
2376 *client = Some(new_client.clone());
2377
2378 new_client
2379 }
2380 };
2381
2382 let resp = client
2383 .members(MembersRequest {})
2384 .await
2385 .context("failed to fetch members")?;
2386
2387 resp.into_inner().members
2388 };
2389
2390 let fetched = members.is_ok();
2391 fetched_members = Some(members);
2392 if fetched {
2393 break;
2394 }
2395 }
2396
2397 let members = fetched_members
2398 .context("no member available in the list")?
2399 .context("could not refresh members")?;
2400
2401 let mut leader = None;
2403 for member in members {
2404 if member.is_leader {
2405 leader = Some(member.clone());
2406 }
2407
2408 let addr = Self::host_address_to_uri(member.address.unwrap());
2409 if !member_group.members.contains(&addr) {
2411 tracing::info!("new meta member joined: {}", addr);
2412 member_group.members.put(addr, None);
2413 }
2414 }
2415
2416 leader
2417 }
2418 };
2419
2420 if let Some(leader) = leader_addr {
2421 let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
2422
2423 if discovered_leader != self.current_leader {
2424 tracing::info!("new meta leader {} discovered", discovered_leader);
2425
2426 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
2427 Duration::from_secs(self.meta_config.meta_leader_lease_secs),
2428 false,
2429 );
2430
2431 let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
2432 let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
2433 GrpcMetaClient::connect_to_endpoint(endpoint).await
2434 })
2435 .await?;
2436
2437 self.recreate_core(channel).await;
2438 self.current_leader = discovered_leader;
2439 }
2440 }
2441
2442 Ok(())
2443 }
2444}
2445
2446impl GrpcMetaClient {
2447 const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
2449 const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
2451 const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
2453 const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
2455
2456 fn start_meta_member_monitor(
2457 &self,
2458 init_leader_addr: http::Uri,
2459 members: Either<MetaMemberClient, MetaMemberGroup>,
2460 force_refresh_receiver: Receiver<Sender<Result<()>>>,
2461 meta_config: Arc<MetaConfig>,
2462 ) -> Result<()> {
2463 let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
2464 let current_leader = init_leader_addr;
2465
2466 let enable_period_tick = matches!(members, Either::Right(_));
2467
2468 let member_management = MetaMemberManagement {
2469 core_ref,
2470 members,
2471 current_leader,
2472 meta_config,
2473 };
2474
2475 let mut force_refresh_receiver = force_refresh_receiver;
2476
2477 tokio::spawn(async move {
2478 let mut member_management = member_management;
2479 let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
2480
2481 loop {
2482 let event: Option<Sender<Result<()>>> = if enable_period_tick {
2483 tokio::select! {
2484 _ = ticker.tick() => None,
2485 result_sender = force_refresh_receiver.recv() => {
2486 if result_sender.is_none() {
2487 break;
2488 }
2489
2490 result_sender
2491 },
2492 }
2493 } else {
2494 let result_sender = force_refresh_receiver.recv().await;
2495
2496 if result_sender.is_none() {
2497 break;
2498 }
2499
2500 result_sender
2501 };
2502
2503 let tick_result = member_management.refresh_members().await;
2504 if let Err(e) = tick_result.as_ref() {
2505 tracing::warn!(error = %e.as_report(), "refresh meta member client failed");
2506 }
2507
2508 if let Some(sender) = event {
2509 let _resp = sender.send(tick_result);
2511 }
2512 }
2513 });
2514
2515 Ok(())
2516 }
2517
2518 async fn force_refresh_leader(&self) -> Result<()> {
2519 let (sender, receiver) = oneshot::channel();
2520
2521 self.member_monitor_event_sender
2522 .send(sender)
2523 .await
2524 .map_err(|e| anyhow!(e))?;
2525
2526 receiver.await.map_err(|e| anyhow!(e))?
2527 }
2528
2529 pub async fn new(strategy: &MetaAddressStrategy, config: Arc<MetaConfig>) -> Result<Self> {
2531 let (channel, addr) = match strategy {
2532 MetaAddressStrategy::LoadBalance(addr) => {
2533 Self::try_build_rpc_channel(vec![addr.clone()]).await
2534 }
2535 MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
2536 }?;
2537 let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
2538 let client = GrpcMetaClient {
2539 member_monitor_event_sender: force_refresh_sender,
2540 core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
2541 };
2542
2543 let meta_member_client = client.core.read().await.meta_member_client.clone();
2544 let members = match strategy {
2545 MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
2546 MetaAddressStrategy::List(addrs) => {
2547 let mut members = LruCache::new(20.try_into().unwrap());
2548 for addr in addrs {
2549 members.put(addr.clone(), None);
2550 }
2551 members.put(addr.clone(), Some(meta_member_client));
2552
2553 Either::Right(MetaMemberGroup { members })
2554 }
2555 };
2556
2557 client.start_meta_member_monitor(addr, members, force_refresh_receiver, config.clone())?;
2558
2559 client.force_refresh_leader().await?;
2560
2561 Ok(client)
2562 }
2563
2564 fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2565 Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2566 }
2567
2568 pub(crate) async fn try_build_rpc_channel(
2569 addrs: impl IntoIterator<Item = http::Uri>,
2570 ) -> Result<(Channel, http::Uri)> {
2571 let endpoints: Vec<_> = addrs
2572 .into_iter()
2573 .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2574 .collect();
2575
2576 let mut last_error = None;
2577
2578 for (endpoint, addr) in endpoints {
2579 match Self::connect_to_endpoint(endpoint).await {
2580 Ok(channel) => {
2581 tracing::info!("Connect to meta server {} successfully", addr);
2582 return Ok((channel, addr));
2583 }
2584 Err(e) => {
2585 tracing::warn!(
2586 error = %e.as_report(),
2587 "Failed to connect to meta server {}, trying again",
2588 addr,
2589 );
2590 last_error = Some(e);
2591 }
2592 }
2593 }
2594
2595 if let Some(last_error) = last_error {
2596 Err(anyhow::anyhow!(last_error)
2597 .context("failed to connect to all meta servers")
2598 .into())
2599 } else {
2600 bail!("no meta server address provided")
2601 }
2602 }
2603
2604 async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2605 let channel = endpoint
2606 .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2607 .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2608 .connect_timeout(Duration::from_secs(5))
2609 .monitored_connect("grpc-meta-client", Default::default())
2610 .await?
2611 .wrapped();
2612
2613 Ok(channel)
2614 }
2615
2616 pub(crate) fn retry_strategy_to_bound(
2617 high_bound: Duration,
2618 exceed: bool,
2619 ) -> impl Iterator<Item = Duration> {
2620 let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2621 .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2622 .map(jitter);
2623
2624 let mut sum = Duration::default();
2625
2626 iter.take_while(move |duration| {
2627 sum += *duration;
2628
2629 if exceed {
2630 sum < high_bound + *duration
2631 } else {
2632 sum < high_bound
2633 }
2634 })
2635 }
2636}
2637
2638macro_rules! for_all_meta_rpc {
2639 ($macro:ident) => {
2640 $macro! {
2641 { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2642 ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2643 ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2644 ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2645 ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2646 ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2647 ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2648 ,{ stream_client, flush, FlushRequest, FlushResponse }
2649 ,{ stream_client, pause, PauseRequest, PauseResponse }
2650 ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2651 ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2652 ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2653 ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2654 ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2655 ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2656 ,{ stream_client, list_creating_fragment_distribution, ListCreatingFragmentDistributionRequest, ListCreatingFragmentDistributionResponse }
2657 ,{ stream_client, list_sink_log_store_tables, ListSinkLogStoreTablesRequest, ListSinkLogStoreTablesResponse }
2658 ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2659 ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2660 ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2661 ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2662 ,{ stream_client, list_cdc_progress, ListCdcProgressRequest, ListCdcProgressResponse }
2663 ,{ stream_client, list_refresh_table_states, ListRefreshTableStatesRequest, ListRefreshTableStatesResponse }
2664 ,{ stream_client, list_iceberg_compaction_status, ListIcebergCompactionStatusRequest, ListIcebergCompactionStatusResponse }
2665 ,{ stream_client, alter_connector_props, AlterConnectorPropsRequest, AlterConnectorPropsResponse }
2666 ,{ stream_client, alter_source_properties_safe, AlterSourcePropertiesSafeRequest, AlterSourcePropertiesSafeResponse }
2667 ,{ stream_client, reset_source_splits, ResetSourceSplitsRequest, ResetSourceSplitsResponse }
2668 ,{ stream_client, inject_source_offsets, InjectSourceOffsetsRequest, InjectSourceOffsetsResponse }
2669 ,{ stream_client, get_fragment_by_id, GetFragmentByIdRequest, GetFragmentByIdResponse }
2670 ,{ stream_client, get_fragment_vnodes, GetFragmentVnodesRequest, GetFragmentVnodesResponse }
2671 ,{ stream_client, get_actor_vnodes, GetActorVnodesRequest, GetActorVnodesResponse }
2672 ,{ stream_client, set_sync_log_store_aligned, SetSyncLogStoreAlignedRequest, SetSyncLogStoreAlignedResponse }
2673 ,{ stream_client, refresh, RefreshRequest, RefreshResponse }
2674 ,{ stream_client, list_unmigrated_tables, ListUnmigratedTablesRequest, ListUnmigratedTablesResponse }
2675 ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2676 ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2677 ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2678 ,{ ddl_client, alter_subscription_retention, AlterSubscriptionRetentionRequest, AlterSubscriptionRetentionResponse }
2679 ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2680 ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2681 ,{ ddl_client, alter_backfill_parallelism, AlterBackfillParallelismRequest, AlterBackfillParallelismResponse }
2682 ,{ ddl_client, alter_streaming_job_config, AlterStreamingJobConfigRequest, AlterStreamingJobConfigResponse }
2683 ,{ ddl_client, alter_fragment_parallelism, AlterFragmentParallelismRequest, AlterFragmentParallelismResponse }
2684 ,{ ddl_client, alter_cdc_table_backfill_parallelism, AlterCdcTableBackfillParallelismRequest, AlterCdcTableBackfillParallelismResponse }
2685 ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2686 ,{ ddl_client, alter_database_param, AlterDatabaseParamRequest, AlterDatabaseParamResponse }
2687 ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2688 ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2689 ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2690 ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2691 ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2692 ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2693 ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2694 ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2695 ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2696 ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2697 ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2698 ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2699 ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2700 ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2701 ,{ ddl_client, reset_source, ResetSourceRequest, ResetSourceResponse }
2702 ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2703 ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2704 ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2705 ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2706 ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2707 ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2708 ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2709 ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2710 ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2711 ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2712 ,{ ddl_client, risectl_resume_backfill, RisectlResumeBackfillRequest, RisectlResumeBackfillResponse }
2713 ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2714 ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2715 ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2716 ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2717 ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2718 ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2719 ,{ ddl_client, wait, WaitRequest, WaitResponse }
2720 ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2721 ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2722 ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2723 ,{ ddl_client, compact_iceberg_table, CompactIcebergTableRequest, CompactIcebergTableResponse }
2724 ,{ ddl_client, expire_iceberg_table_snapshots, ExpireIcebergTableSnapshotsRequest, ExpireIcebergTableSnapshotsResponse }
2725 ,{ ddl_client, create_iceberg_table, CreateIcebergTableRequest, CreateIcebergTableResponse }
2726 ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2727 ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2728 ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2729 ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2730 ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2731 ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2732 ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2733 ,{ hummock_client, get_new_object_ids, GetNewObjectIdsRequest, GetNewObjectIdsResponse }
2734 ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2735 ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2736 ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2737 ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2738 ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2739 ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2740 ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2741 ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2742 ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2743 ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2744 ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2745 ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2746 ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2747 ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2748 ,{ hummock_client, subscribe_iceberg_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeIcebergCompactionEventRequest>, Streaming<SubscribeIcebergCompactionEventResponse> }
2749 ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2750 ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2751 ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2752 ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2753 ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2754 ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2755 ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2756 ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2757 ,{ hummock_client, get_table_change_logs, GetTableChangeLogsRequest, GetTableChangeLogsResponse }
2758 ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2759 ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2760 ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2761 ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2762 ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2763 ,{ user_client, alter_default_privilege, AlterDefaultPrivilegeRequest, AlterDefaultPrivilegeResponse }
2764 ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2765 ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2766 ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2767 ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2768 ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2769 ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2770 ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2771 ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2772 ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2773 ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2774 ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2775 ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2776 ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2777 ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2778 ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2779 ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2780 ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2781 ,{ hosted_iceberg_catalog_service_client, list_iceberg_tables, ListIcebergTablesRequest, ListIcebergTablesResponse }
2782 ,{ monitor_client, stack_trace, StackTraceRequest, StackTraceResponse }
2783 }
2784 };
2785}
2786
2787impl GrpcMetaClient {
2788 async fn refresh_client_if_needed(&self, code: Code) {
2789 if matches!(
2790 code,
2791 Code::Unknown | Code::Unimplemented | Code::Unavailable
2792 ) {
2793 tracing::debug!("matching tonic code {}", code);
2794 let (result_sender, result_receiver) = oneshot::channel();
2795 if self
2796 .member_monitor_event_sender
2797 .try_send(result_sender)
2798 .is_ok()
2799 {
2800 if let Ok(Err(e)) = result_receiver.await {
2801 tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2802 }
2803 } else {
2804 tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2805 }
2806 }
2807 }
2808}
2809
2810impl GrpcMetaClient {
2811 for_all_meta_rpc! { meta_rpc_client_method_impl }
2812}