1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{
33 AlterDatabaseParam, FunctionId, IndexId, ObjectId, SecretId, TableId,
34};
35use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
36use risingwave_common::hash::WorkerSlotMapping;
37use risingwave_common::id::{
38 ConnectionId, DatabaseId, JobId, SchemaId, SinkId, SubscriptionId, UserId, ViewId, WorkerId,
39};
40use risingwave_common::monitor::EndpointExt;
41use risingwave_common::system_param::AdaptiveParallelismStrategy;
42use risingwave_common::system_param::reader::SystemParamsReader;
43use risingwave_common::telemetry::report::TelemetryInfoFetcher;
44use risingwave_common::util::addr::HostAddr;
45use risingwave_common::util::meta_addr::MetaAddressStrategy;
46use risingwave_common::util::resource_util::cpu::total_cpu_available;
47use risingwave_common::util::resource_util::hostname;
48use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
49use risingwave_error::bail;
50use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
51use risingwave_hummock_sdk::change_log::{TableChangeLog, TableChangeLogs};
52use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
53use risingwave_hummock_sdk::{
54 CompactionGroupId, HummockEpoch, HummockVersionId, ObjectIdRange, SyncResult,
55};
56use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
57use risingwave_pb::backup_service::*;
58use risingwave_pb::catalog::{
59 Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
60 PbSubscription, PbTable, PbView, Table,
61};
62use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
63use risingwave_pb::cloud_service::*;
64use risingwave_pb::common::worker_node::Property;
65use risingwave_pb::common::{HostAddress, OptionalUint32, OptionalUint64, WorkerNode, WorkerType};
66use risingwave_pb::configured_monitor_service_client;
67use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
68use risingwave_pb::ddl_service::alter_owner_request::Object;
69use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
70use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
71use risingwave_pb::ddl_service::*;
72use risingwave_pb::hummock::compact_task::TaskStatus;
73use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
74use risingwave_pb::hummock::get_table_change_logs_request::PbTableFilter;
75use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
76use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
77use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
78use risingwave_pb::hummock::write_limits::WriteLimit;
79use risingwave_pb::hummock::*;
80use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Register as IcebergRegister;
81use risingwave_pb::iceberg_compaction::{
82 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
83 subscribe_iceberg_compaction_event_request,
84};
85use risingwave_pb::id::{ActorId, FragmentId, HummockSstableId, SourceId};
86use risingwave_pb::meta::alter_connector_props_request::{
87 AlterConnectorPropsObject, AlterIcebergTableIds, ExtraOptions,
88};
89use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
90use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
91use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
92use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
93use risingwave_pb::meta::hosted_iceberg_catalog_service_client::HostedIcebergCatalogServiceClient;
94use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
95use risingwave_pb::meta::list_actor_states_response::ActorState;
96use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
97use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
98use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
99use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
100use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
101use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
102use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
103use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
104use risingwave_pb::meta::serving_service_client::ServingServiceClient;
105use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
106use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
107use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
108use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
109use risingwave_pb::meta::{FragmentDistribution, *};
110use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
111use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
112use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
113use risingwave_pb::secret::PbSecretRef;
114use risingwave_pb::stream_plan::StreamFragmentGraph;
115use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
116use risingwave_pb::user::update_user_request::UpdateField;
117use risingwave_pb::user::user_service_client::UserServiceClient;
118use risingwave_pb::user::*;
119use thiserror_ext::AsReport;
120use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
121use tokio::sync::oneshot::Sender;
122use tokio::sync::{RwLock, mpsc, oneshot};
123use tokio::task::JoinHandle;
124use tokio::time::{self};
125use tokio_retry::strategy::{ExponentialBackoff, jitter};
126use tokio_stream::wrappers::UnboundedReceiverStream;
127use tonic::transport::Endpoint;
128use tonic::{Code, Request, Streaming};
129
130use crate::channel::{Channel, WrappedChannelExt};
131use crate::error::{Result, RpcError};
132use crate::hummock_meta_client::{
133 CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
134 IcebergCompactionEventItem,
135};
136use crate::meta_rpc_client_method_impl;
137
138#[derive(Clone, Debug)]
140pub struct MetaClient {
141 worker_id: WorkerId,
142 worker_type: WorkerType,
143 host_addr: HostAddr,
144 inner: GrpcMetaClient,
145 meta_config: Arc<MetaConfig>,
146 cluster_id: String,
147 shutting_down: Arc<AtomicBool>,
148}
149
150impl MetaClient {
151 pub fn worker_id(&self) -> WorkerId {
152 self.worker_id
153 }
154
155 pub fn host_addr(&self) -> &HostAddr {
156 &self.host_addr
157 }
158
159 pub fn worker_type(&self) -> WorkerType {
160 self.worker_type
161 }
162
163 pub fn cluster_id(&self) -> &str {
164 &self.cluster_id
165 }
166
167 pub async fn subscribe(
169 &self,
170 subscribe_type: SubscribeType,
171 ) -> Result<Streaming<SubscribeResponse>> {
172 let request = SubscribeRequest {
173 subscribe_type: subscribe_type as i32,
174 host: Some(self.host_addr.to_protobuf()),
175 worker_id: self.worker_id(),
176 };
177
178 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
179 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
180 true,
181 );
182
183 tokio_retry::Retry::spawn(retry_strategy, || async {
184 let request = request.clone();
185 self.inner.subscribe(request).await
186 })
187 .await
188 }
189
190 pub async fn create_connection(
191 &self,
192 connection_name: String,
193 database_id: DatabaseId,
194 schema_id: SchemaId,
195 owner_id: UserId,
196 req: create_connection_request::Payload,
197 ) -> Result<WaitVersion> {
198 let request = CreateConnectionRequest {
199 name: connection_name,
200 database_id,
201 schema_id,
202 owner_id,
203 payload: Some(req),
204 };
205 let resp = self.inner.create_connection(request).await?;
206 Ok(resp
207 .version
208 .ok_or_else(|| anyhow!("wait version not set"))?)
209 }
210
211 pub async fn create_secret(
212 &self,
213 secret_name: String,
214 database_id: DatabaseId,
215 schema_id: SchemaId,
216 owner_id: UserId,
217 value: Vec<u8>,
218 ) -> Result<WaitVersion> {
219 let request = CreateSecretRequest {
220 name: secret_name,
221 database_id,
222 schema_id,
223 owner_id,
224 value,
225 };
226 let resp = self.inner.create_secret(request).await?;
227 Ok(resp
228 .version
229 .ok_or_else(|| anyhow!("wait version not set"))?)
230 }
231
232 pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
233 let request = ListConnectionsRequest {};
234 let resp = self.inner.list_connections(request).await?;
235 Ok(resp.connections)
236 }
237
238 pub async fn drop_connection(
239 &self,
240 connection_id: ConnectionId,
241 cascade: bool,
242 ) -> Result<WaitVersion> {
243 let request = DropConnectionRequest {
244 connection_id,
245 cascade,
246 };
247 let resp = self.inner.drop_connection(request).await?;
248 Ok(resp
249 .version
250 .ok_or_else(|| anyhow!("wait version not set"))?)
251 }
252
253 pub async fn drop_secret(&self, secret_id: SecretId, cascade: bool) -> Result<WaitVersion> {
254 let request = DropSecretRequest { secret_id, cascade };
255 let resp = self.inner.drop_secret(request).await?;
256 Ok(resp
257 .version
258 .ok_or_else(|| anyhow!("wait version not set"))?)
259 }
260
261 pub async fn register_new(
265 addr_strategy: MetaAddressStrategy,
266 worker_type: WorkerType,
267 addr: &HostAddr,
268 property: Property,
269 meta_config: Arc<MetaConfig>,
270 ) -> (Self, SystemParamsReader) {
271 let ret =
272 Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
273
274 match ret {
275 Ok(ret) => ret,
276 Err(err) => {
277 tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
278 std::process::exit(1);
279 }
280 }
281 }
282
283 async fn register_new_inner(
284 addr_strategy: MetaAddressStrategy,
285 worker_type: WorkerType,
286 addr: &HostAddr,
287 property: Property,
288 meta_config: Arc<MetaConfig>,
289 ) -> Result<(Self, SystemParamsReader)> {
290 tracing::info!("register meta client using strategy: {}", addr_strategy);
291
292 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
294 Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
295 true,
296 );
297
298 let init_result: Result<_> = tokio_retry::RetryIf::spawn(
299 retry_strategy,
300 || async {
301 let grpc_meta_client =
302 GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
303
304 let add_worker_resp = grpc_meta_client
305 .add_worker_node(AddWorkerNodeRequest {
306 worker_type: worker_type as i32,
307 host: Some(addr.to_protobuf()),
308 property: Some(property.clone()),
309 resource: Some(risingwave_pb::common::worker_node::Resource {
310 rw_version: RW_VERSION.to_owned(),
311 total_memory_bytes: system_memory_available_bytes() as _,
312 total_cpu_cores: total_cpu_available() as _,
313 hostname: hostname(),
314 }),
315 })
316 .await
317 .context("failed to add worker node")?;
318
319 let system_params_resp = grpc_meta_client
320 .get_system_params(GetSystemParamsRequest {})
321 .await
322 .context("failed to get initial system params")?;
323
324 Ok((add_worker_resp, system_params_resp, grpc_meta_client))
325 },
326 |e: &RpcError| !e.is_from_tonic_server_impl(),
329 )
330 .await;
331
332 let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
333 let worker_id = add_worker_resp
334 .node_id
335 .expect("AddWorkerNodeResponse::node_id is empty");
336
337 let meta_client = Self {
338 worker_id,
339 worker_type,
340 host_addr: addr.clone(),
341 inner: grpc_meta_client,
342 meta_config: meta_config.clone(),
343 cluster_id: add_worker_resp.cluster_id,
344 shutting_down: Arc::new(false.into()),
345 };
346
347 static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
348 REPORT_PANIC.call_once(|| {
349 let meta_client_clone = meta_client.clone();
350 std::panic::update_hook(move |default_hook, info| {
351 meta_client_clone.try_add_panic_event_blocking(info, None);
353 default_hook(info);
354 });
355 });
356
357 Ok((meta_client, system_params_resp.params.unwrap().into()))
358 }
359
360 pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
362 let request = ActivateWorkerNodeRequest {
363 host: Some(addr.to_protobuf()),
364 node_id: self.worker_id,
365 };
366 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
367 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
368 true,
369 );
370 tokio_retry::Retry::spawn(retry_strategy, || async {
371 let request = request.clone();
372 self.inner.activate_worker_node(request).await
373 })
374 .await?;
375
376 Ok(())
377 }
378
379 pub async fn send_heartbeat(&self) -> Result<()> {
381 let request = HeartbeatRequest {
382 node_id: self.worker_id,
383 resource: Some(risingwave_pb::common::worker_node::Resource {
384 rw_version: RW_VERSION.to_owned(),
385 total_memory_bytes: system_memory_available_bytes() as _,
386 total_cpu_cores: total_cpu_available() as _,
387 hostname: hostname(),
388 }),
389 };
390 let resp = self.inner.heartbeat(request).await?;
391 if let Some(status) = resp.status
392 && status.code() == risingwave_pb::common::status::Code::UnknownWorker
393 {
394 if !self.shutting_down.load(Relaxed) {
397 tracing::error!(message = status.message, "worker expired");
398 std::process::exit(1);
399 }
400 }
401 Ok(())
402 }
403
404 pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
405 let request = CreateDatabaseRequest { db: Some(db) };
406 let resp = self.inner.create_database(request).await?;
407 Ok(resp
409 .version
410 .ok_or_else(|| anyhow!("wait version not set"))?)
411 }
412
413 pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
414 let request = CreateSchemaRequest {
415 schema: Some(schema),
416 };
417 let resp = self.inner.create_schema(request).await?;
418 Ok(resp
420 .version
421 .ok_or_else(|| anyhow!("wait version not set"))?)
422 }
423
424 pub async fn create_materialized_view(
425 &self,
426 table: PbTable,
427 graph: StreamFragmentGraph,
428 dependencies: HashSet<ObjectId>,
429 resource_type: streaming_job_resource_type::ResourceType,
430 if_not_exists: bool,
431 refresh_interval_sec: Option<u64>,
432 ) -> Result<WaitVersion> {
433 let request = CreateMaterializedViewRequest {
434 materialized_view: Some(table),
435 fragment_graph: Some(graph),
436 resource_type: Some(PbStreamingJobResourceType {
437 resource_type: Some(resource_type),
438 }),
439 dependencies: dependencies.into_iter().collect(),
440 if_not_exists,
441 refresh_interval_sec,
442 };
443 let resp = self.inner.create_materialized_view(request).await?;
444 Ok(resp
446 .version
447 .ok_or_else(|| anyhow!("wait version not set"))?)
448 }
449
450 pub async fn drop_materialized_view(
451 &self,
452 table_id: TableId,
453 cascade: bool,
454 ) -> Result<WaitVersion> {
455 let request = DropMaterializedViewRequest { table_id, cascade };
456
457 let resp = self.inner.drop_materialized_view(request).await?;
458 Ok(resp
459 .version
460 .ok_or_else(|| anyhow!("wait version not set"))?)
461 }
462
463 pub async fn create_source(
464 &self,
465 source: PbSource,
466 graph: Option<StreamFragmentGraph>,
467 if_not_exists: bool,
468 ) -> Result<WaitVersion> {
469 let request = CreateSourceRequest {
470 source: Some(source),
471 fragment_graph: graph,
472 if_not_exists,
473 };
474
475 let resp = self.inner.create_source(request).await?;
476 Ok(resp
477 .version
478 .ok_or_else(|| anyhow!("wait version not set"))?)
479 }
480
481 pub async fn create_sink(
482 &self,
483 sink: PbSink,
484 graph: StreamFragmentGraph,
485 dependencies: HashSet<ObjectId>,
486 resource_type: streaming_job_resource_type::ResourceType,
487 if_not_exists: bool,
488 ) -> Result<WaitVersion> {
489 let request = CreateSinkRequest {
490 sink: Some(sink),
491 fragment_graph: Some(graph),
492 dependencies: dependencies.into_iter().collect(),
493 if_not_exists,
494 resource_type: Some(PbStreamingJobResourceType {
495 resource_type: Some(resource_type),
496 }),
497 };
498
499 let resp = self.inner.create_sink(request).await?;
500 Ok(resp
501 .version
502 .ok_or_else(|| anyhow!("wait version not set"))?)
503 }
504
505 pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
506 let request = CreateSubscriptionRequest {
507 subscription: Some(subscription),
508 };
509
510 let resp = self.inner.create_subscription(request).await?;
511 Ok(resp
512 .version
513 .ok_or_else(|| anyhow!("wait version not set"))?)
514 }
515
516 pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
517 let request = CreateFunctionRequest {
518 function: Some(function),
519 };
520 let resp = self.inner.create_function(request).await?;
521 Ok(resp
522 .version
523 .ok_or_else(|| anyhow!("wait version not set"))?)
524 }
525
526 pub async fn create_table(
527 &self,
528 source: Option<PbSource>,
529 table: PbTable,
530 graph: StreamFragmentGraph,
531 job_type: PbTableJobType,
532 if_not_exists: bool,
533 dependencies: HashSet<ObjectId>,
534 ) -> Result<WaitVersion> {
535 let request = CreateTableRequest {
536 materialized_view: Some(table),
537 fragment_graph: Some(graph),
538 source,
539 job_type: job_type as _,
540 if_not_exists,
541 dependencies: dependencies.into_iter().collect(),
542 };
543 let resp = self.inner.create_table(request).await?;
544 Ok(resp
546 .version
547 .ok_or_else(|| anyhow!("wait version not set"))?)
548 }
549
550 pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
551 let request = CommentOnRequest {
552 comment: Some(comment),
553 };
554 let resp = self.inner.comment_on(request).await?;
555 Ok(resp
556 .version
557 .ok_or_else(|| anyhow!("wait version not set"))?)
558 }
559
560 pub async fn alter_name(
561 &self,
562 object: alter_name_request::Object,
563 name: &str,
564 ) -> Result<WaitVersion> {
565 let request = AlterNameRequest {
566 object: Some(object),
567 new_name: name.to_owned(),
568 };
569 let resp = self.inner.alter_name(request).await?;
570 Ok(resp
571 .version
572 .ok_or_else(|| anyhow!("wait version not set"))?)
573 }
574
575 pub async fn alter_owner(&self, object: Object, owner_id: UserId) -> Result<WaitVersion> {
576 let request = AlterOwnerRequest {
577 object: Some(object),
578 owner_id,
579 };
580 let resp = self.inner.alter_owner(request).await?;
581 Ok(resp
582 .version
583 .ok_or_else(|| anyhow!("wait version not set"))?)
584 }
585
586 pub async fn alter_subscription_retention(
587 &self,
588 subscription_id: SubscriptionId,
589 retention_seconds: u64,
590 definition: String,
591 ) -> Result<WaitVersion> {
592 let request = AlterSubscriptionRetentionRequest {
593 subscription_id,
594 retention_seconds,
595 definition,
596 };
597 let resp = self.inner.alter_subscription_retention(request).await?;
598 Ok(resp
599 .version
600 .ok_or_else(|| anyhow!("wait version not set"))?)
601 }
602
603 pub async fn alter_database_param(
604 &self,
605 database_id: DatabaseId,
606 param: AlterDatabaseParam,
607 ) -> Result<WaitVersion> {
608 let request = match param {
609 AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms) => {
610 let barrier_interval_ms = OptionalUint32 {
611 value: barrier_interval_ms,
612 };
613 AlterDatabaseParamRequest {
614 database_id,
615 param: Some(alter_database_param_request::Param::BarrierIntervalMs(
616 barrier_interval_ms,
617 )),
618 }
619 }
620 AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency) => {
621 let checkpoint_frequency = OptionalUint64 {
622 value: checkpoint_frequency,
623 };
624 AlterDatabaseParamRequest {
625 database_id,
626 param: Some(alter_database_param_request::Param::CheckpointFrequency(
627 checkpoint_frequency,
628 )),
629 }
630 }
631 };
632 let resp = self.inner.alter_database_param(request).await?;
633 Ok(resp
634 .version
635 .ok_or_else(|| anyhow!("wait version not set"))?)
636 }
637
638 pub async fn alter_set_schema(
639 &self,
640 object: alter_set_schema_request::Object,
641 new_schema_id: SchemaId,
642 ) -> Result<WaitVersion> {
643 let request = AlterSetSchemaRequest {
644 new_schema_id,
645 object: Some(object),
646 };
647 let resp = self.inner.alter_set_schema(request).await?;
648 Ok(resp
649 .version
650 .ok_or_else(|| anyhow!("wait version not set"))?)
651 }
652
653 pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
654 let request = AlterSourceRequest {
655 source: Some(source),
656 };
657 let resp = self.inner.alter_source(request).await?;
658 Ok(resp
659 .version
660 .ok_or_else(|| anyhow!("wait version not set"))?)
661 }
662
663 pub async fn alter_parallelism(
664 &self,
665 job_id: JobId,
666 parallelism: PbTableParallelism,
667 adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
668 deferred: bool,
669 ) -> Result<()> {
670 let request = AlterParallelismRequest {
671 table_id: job_id,
672 parallelism: Some(parallelism),
673 deferred,
674 adaptive_parallelism_strategy: adaptive_parallelism_strategy
675 .map(|strategy| strategy.to_string()),
676 };
677
678 self.inner.alter_parallelism(request).await?;
679 Ok(())
680 }
681
682 pub async fn alter_backfill_parallelism(
683 &self,
684 job_id: JobId,
685 parallelism: Option<PbTableParallelism>,
686 adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
687 deferred: bool,
688 ) -> Result<()> {
689 let request = AlterBackfillParallelismRequest {
690 table_id: job_id,
691 parallelism,
692 deferred,
693 adaptive_parallelism_strategy: adaptive_parallelism_strategy
694 .map(|strategy| strategy.to_string()),
695 };
696
697 self.inner.alter_backfill_parallelism(request).await?;
698 Ok(())
699 }
700
701 pub async fn alter_streaming_job_config(
702 &self,
703 job_id: JobId,
704 entries_to_add: HashMap<String, String>,
705 keys_to_remove: Vec<String>,
706 ) -> Result<()> {
707 let request = AlterStreamingJobConfigRequest {
708 job_id,
709 entries_to_add,
710 keys_to_remove,
711 };
712
713 self.inner.alter_streaming_job_config(request).await?;
714 Ok(())
715 }
716
717 pub async fn alter_fragment_parallelism(
718 &self,
719 fragment_ids: Vec<FragmentId>,
720 parallelism: Option<PbTableParallelism>,
721 ) -> Result<()> {
722 let request = AlterFragmentParallelismRequest {
723 fragment_ids,
724 parallelism,
725 };
726
727 self.inner.alter_fragment_parallelism(request).await?;
728 Ok(())
729 }
730
731 pub async fn alter_cdc_table_backfill_parallelism(
732 &self,
733 table_id: JobId,
734 parallelism: PbTableParallelism,
735 ) -> Result<()> {
736 let request = AlterCdcTableBackfillParallelismRequest {
737 table_id,
738 parallelism: Some(parallelism),
739 };
740 self.inner
741 .alter_cdc_table_backfill_parallelism(request)
742 .await?;
743 Ok(())
744 }
745
746 pub async fn alter_resource_group(
747 &self,
748 job_id: JobId,
749 resource_group: Option<String>,
750 deferred: bool,
751 ) -> Result<()> {
752 let request = AlterResourceGroupRequest {
753 job_id,
754 resource_group,
755 deferred,
756 };
757
758 self.inner.alter_resource_group(request).await?;
759 Ok(())
760 }
761
762 pub async fn alter_database_resource_group(
763 &self,
764 database_id: DatabaseId,
765 resource_group: Option<String>,
766 deferred: bool,
767 ) -> Result<WaitVersion> {
768 let request = AlterDatabaseResourceGroupRequest {
769 database_id,
770 resource_group,
771 deferred,
772 };
773
774 let resp = self.inner.alter_database_resource_group(request).await?;
775 Ok(resp
776 .version
777 .ok_or_else(|| anyhow!("wait version not set"))?)
778 }
779
780 pub async fn alter_swap_rename(
781 &self,
782 object: alter_swap_rename_request::Object,
783 ) -> Result<WaitVersion> {
784 let request = AlterSwapRenameRequest {
785 object: Some(object),
786 };
787 let resp = self.inner.alter_swap_rename(request).await?;
788 Ok(resp
789 .version
790 .ok_or_else(|| anyhow!("wait version not set"))?)
791 }
792
793 pub async fn alter_secret(
794 &self,
795 secret_id: SecretId,
796 secret_name: String,
797 database_id: DatabaseId,
798 schema_id: SchemaId,
799 owner_id: UserId,
800 value: Vec<u8>,
801 ) -> Result<WaitVersion> {
802 let request = AlterSecretRequest {
803 secret_id,
804 name: secret_name,
805 database_id,
806 schema_id,
807 owner_id,
808 value,
809 };
810 let resp = self.inner.alter_secret(request).await?;
811 Ok(resp
812 .version
813 .ok_or_else(|| anyhow!("wait version not set"))?)
814 }
815
816 pub async fn replace_job(
817 &self,
818 graph: StreamFragmentGraph,
819 replace_job: ReplaceJob,
820 ) -> Result<WaitVersion> {
821 let request = ReplaceJobPlanRequest {
822 plan: Some(ReplaceJobPlan {
823 fragment_graph: Some(graph),
824 replace_job: Some(replace_job),
825 }),
826 };
827 let resp = self.inner.replace_job_plan(request).await?;
828 Ok(resp
830 .version
831 .ok_or_else(|| anyhow!("wait version not set"))?)
832 }
833
834 pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
835 let request = AutoSchemaChangeRequest {
836 schema_change: Some(schema_change),
837 };
838 let _ = self.inner.auto_schema_change(request).await?;
839 Ok(())
840 }
841
842 pub async fn create_view(
843 &self,
844 view: PbView,
845 dependencies: HashSet<ObjectId>,
846 ) -> Result<WaitVersion> {
847 let request = CreateViewRequest {
848 view: Some(view),
849 dependencies: dependencies.into_iter().collect(),
850 };
851 let resp = self.inner.create_view(request).await?;
852 Ok(resp
854 .version
855 .ok_or_else(|| anyhow!("wait version not set"))?)
856 }
857
858 pub async fn create_index(
859 &self,
860 index: PbIndex,
861 table: PbTable,
862 graph: StreamFragmentGraph,
863 resource_type: streaming_job_resource_type::ResourceType,
864 if_not_exists: bool,
865 ) -> Result<WaitVersion> {
866 let request = CreateIndexRequest {
867 index: Some(index),
868 index_table: Some(table),
869 fragment_graph: Some(graph),
870 if_not_exists,
871 resource_type: Some(PbStreamingJobResourceType {
872 resource_type: Some(resource_type),
873 }),
874 };
875 let resp = self.inner.create_index(request).await?;
876 Ok(resp
878 .version
879 .ok_or_else(|| anyhow!("wait version not set"))?)
880 }
881
882 pub async fn drop_table(
883 &self,
884 source_id: Option<SourceId>,
885 table_id: TableId,
886 cascade: bool,
887 ) -> Result<WaitVersion> {
888 let request = DropTableRequest {
889 source_id: source_id.map(risingwave_pb::ddl_service::drop_table_request::SourceId::Id),
890 table_id,
891 cascade,
892 };
893
894 let resp = self.inner.drop_table(request).await?;
895 Ok(resp
896 .version
897 .ok_or_else(|| anyhow!("wait version not set"))?)
898 }
899
900 pub async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
901 let request = CompactIcebergTableRequest { sink_id };
902 let resp = self.inner.compact_iceberg_table(request).await?;
903 Ok(resp.task_id)
904 }
905
906 pub async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
907 let request = ExpireIcebergTableSnapshotsRequest { sink_id };
908 let _resp = self.inner.expire_iceberg_table_snapshots(request).await?;
909 Ok(())
910 }
911
912 pub async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<WaitVersion> {
913 let request = DropViewRequest { view_id, cascade };
914 let resp = self.inner.drop_view(request).await?;
915 Ok(resp
916 .version
917 .ok_or_else(|| anyhow!("wait version not set"))?)
918 }
919
920 pub async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<WaitVersion> {
921 let request = DropSourceRequest { source_id, cascade };
922 let resp = self.inner.drop_source(request).await?;
923 Ok(resp
924 .version
925 .ok_or_else(|| anyhow!("wait version not set"))?)
926 }
927
928 pub async fn reset_source(&self, source_id: SourceId) -> Result<WaitVersion> {
929 let request = ResetSourceRequest { source_id };
930 let resp = self.inner.reset_source(request).await?;
931 Ok(resp
932 .version
933 .ok_or_else(|| anyhow!("wait version not set"))?)
934 }
935
936 pub async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<WaitVersion> {
937 let request = DropSinkRequest { sink_id, cascade };
938 let resp = self.inner.drop_sink(request).await?;
939 Ok(resp
940 .version
941 .ok_or_else(|| anyhow!("wait version not set"))?)
942 }
943
944 pub async fn drop_subscription(
945 &self,
946 subscription_id: SubscriptionId,
947 cascade: bool,
948 ) -> Result<WaitVersion> {
949 let request = DropSubscriptionRequest {
950 subscription_id,
951 cascade,
952 };
953 let resp = self.inner.drop_subscription(request).await?;
954 Ok(resp
955 .version
956 .ok_or_else(|| anyhow!("wait version not set"))?)
957 }
958
959 pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
960 let request = DropIndexRequest { index_id, cascade };
961 let resp = self.inner.drop_index(request).await?;
962 Ok(resp
963 .version
964 .ok_or_else(|| anyhow!("wait version not set"))?)
965 }
966
967 pub async fn drop_function(
968 &self,
969 function_id: FunctionId,
970 cascade: bool,
971 ) -> Result<WaitVersion> {
972 let request = DropFunctionRequest {
973 function_id,
974 cascade,
975 };
976 let resp = self.inner.drop_function(request).await?;
977 Ok(resp
978 .version
979 .ok_or_else(|| anyhow!("wait version not set"))?)
980 }
981
982 pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
983 let request = DropDatabaseRequest { database_id };
984 let resp = self.inner.drop_database(request).await?;
985 Ok(resp
986 .version
987 .ok_or_else(|| anyhow!("wait version not set"))?)
988 }
989
990 pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
991 let request = DropSchemaRequest { schema_id, cascade };
992 let resp = self.inner.drop_schema(request).await?;
993 Ok(resp
994 .version
995 .ok_or_else(|| anyhow!("wait version not set"))?)
996 }
997
998 pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
1000 let request = CreateUserRequest { user: Some(user) };
1001 let resp = self.inner.create_user(request).await?;
1002 Ok(resp.version)
1003 }
1004
1005 pub async fn drop_user(&self, user_id: UserId) -> Result<u64> {
1006 let request = DropUserRequest { user_id };
1007 let resp = self.inner.drop_user(request).await?;
1008 Ok(resp.version)
1009 }
1010
1011 pub async fn update_user(
1012 &self,
1013 user: UserInfo,
1014 update_fields: Vec<UpdateField>,
1015 ) -> Result<u64> {
1016 let request = UpdateUserRequest {
1017 user: Some(user),
1018 update_fields: update_fields
1019 .into_iter()
1020 .map(|field| field as i32)
1021 .collect::<Vec<_>>(),
1022 };
1023 let resp = self.inner.update_user(request).await?;
1024 Ok(resp.version)
1025 }
1026
1027 pub async fn grant_privilege(
1028 &self,
1029 user_ids: Vec<UserId>,
1030 privileges: Vec<GrantPrivilege>,
1031 with_grant_option: bool,
1032 granted_by: UserId,
1033 ) -> Result<u64> {
1034 let request = GrantPrivilegeRequest {
1035 user_ids,
1036 privileges,
1037 with_grant_option,
1038 granted_by,
1039 };
1040 let resp = self.inner.grant_privilege(request).await?;
1041 Ok(resp.version)
1042 }
1043
1044 pub async fn revoke_privilege(
1045 &self,
1046 user_ids: Vec<UserId>,
1047 privileges: Vec<GrantPrivilege>,
1048 granted_by: UserId,
1049 revoke_by: UserId,
1050 revoke_grant_option: bool,
1051 cascade: bool,
1052 ) -> Result<u64> {
1053 let request = RevokePrivilegeRequest {
1054 user_ids,
1055 privileges,
1056 granted_by,
1057 revoke_by,
1058 revoke_grant_option,
1059 cascade,
1060 };
1061 let resp = self.inner.revoke_privilege(request).await?;
1062 Ok(resp.version)
1063 }
1064
1065 pub async fn alter_default_privilege(
1066 &self,
1067 user_ids: Vec<UserId>,
1068 database_id: DatabaseId,
1069 schema_ids: Vec<SchemaId>,
1070 operation: AlterDefaultPrivilegeOperation,
1071 granted_by: UserId,
1072 ) -> Result<()> {
1073 let request = AlterDefaultPrivilegeRequest {
1074 user_ids,
1075 database_id,
1076 schema_ids,
1077 operation: Some(operation),
1078 granted_by,
1079 };
1080 self.inner.alter_default_privilege(request).await?;
1081 Ok(())
1082 }
1083
1084 pub async fn unregister(&self) -> Result<()> {
1086 let request = DeleteWorkerNodeRequest {
1087 host: Some(self.host_addr.to_protobuf()),
1088 };
1089 self.inner.delete_worker_node(request).await?;
1090 self.shutting_down.store(true, Relaxed);
1091 Ok(())
1092 }
1093
1094 pub async fn try_unregister(&self) {
1096 match self.unregister().await {
1097 Ok(_) => {
1098 tracing::info!(
1099 worker_id = %self.worker_id(),
1100 "successfully unregistered from meta service",
1101 )
1102 }
1103 Err(e) => {
1104 tracing::warn!(
1105 error = %e.as_report(),
1106 worker_id = %self.worker_id(),
1107 "failed to unregister from meta service",
1108 );
1109 }
1110 }
1111 }
1112
1113 pub async fn list_worker_nodes(
1114 &self,
1115 worker_type: Option<WorkerType>,
1116 ) -> Result<Vec<WorkerNode>> {
1117 let request = ListAllNodesRequest {
1118 worker_type: worker_type.map(Into::into),
1119 include_starting_nodes: true,
1120 };
1121 let resp = self.inner.list_all_nodes(request).await?;
1122 Ok(resp.nodes)
1123 }
1124
1125 pub fn start_heartbeat_loop(
1127 meta_client: MetaClient,
1128 min_interval: Duration,
1129 ) -> (JoinHandle<()>, Sender<()>) {
1130 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1131 let join_handle = tokio::spawn(async move {
1132 let mut min_interval_ticker = tokio::time::interval(min_interval);
1133 loop {
1134 tokio::select! {
1135 biased;
1136 _ = &mut shutdown_rx => {
1138 tracing::info!("Heartbeat loop is stopped");
1139 return;
1140 }
1141 _ = min_interval_ticker.tick() => {},
1143 }
1144 tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
1145 match tokio::time::timeout(
1146 min_interval * 3,
1148 meta_client.send_heartbeat(),
1149 )
1150 .await
1151 {
1152 Ok(Ok(_)) => {}
1153 Ok(Err(err)) => {
1154 tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
1155 }
1156 Err(_) => {
1157 tracing::warn!("Failed to send_heartbeat: timeout");
1158 }
1159 }
1160 }
1161 });
1162 (join_handle, shutdown_tx)
1163 }
1164
1165 pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
1166 let request = RisectlListStateTablesRequest {};
1167 let resp = self.inner.risectl_list_state_tables(request).await?;
1168 Ok(resp.tables)
1169 }
1170
1171 pub async fn risectl_resume_backfill(
1172 &self,
1173 request: RisectlResumeBackfillRequest,
1174 ) -> Result<()> {
1175 self.inner.risectl_resume_backfill(request).await?;
1176 Ok(())
1177 }
1178
1179 pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
1180 let request = FlushRequest { database_id };
1181 let resp = self.inner.flush(request).await?;
1182 Ok(resp.hummock_version_id)
1183 }
1184
1185 pub async fn wait(&self, job_id: Option<JobId>) -> Result<WaitVersion> {
1186 let request = WaitRequest { job_id };
1187 let resp = self.inner.wait(request).await?;
1188 Ok(resp
1189 .version
1190 .ok_or_else(|| anyhow!("wait version not set"))?)
1191 }
1192
1193 pub async fn recover(&self) -> Result<()> {
1194 let request = RecoverRequest {};
1195 self.inner.recover(request).await?;
1196 Ok(())
1197 }
1198
1199 pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
1200 let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
1201 let resp = self.inner.cancel_creating_jobs(request).await?;
1202 Ok(resp.canceled_jobs)
1203 }
1204
1205 pub async fn list_table_fragments(
1206 &self,
1207 job_ids: &[JobId],
1208 ) -> Result<HashMap<JobId, TableFragmentInfo>> {
1209 let request = ListTableFragmentsRequest {
1210 table_ids: job_ids.to_vec(),
1211 };
1212 let resp = self.inner.list_table_fragments(request).await?;
1213 Ok(resp.table_fragments)
1214 }
1215
1216 pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1217 let resp = self
1218 .inner
1219 .list_streaming_job_states(ListStreamingJobStatesRequest {})
1220 .await?;
1221 Ok(resp.states)
1222 }
1223
1224 pub async fn list_fragment_distributions(
1225 &self,
1226 include_node: bool,
1227 ) -> Result<Vec<FragmentDistribution>> {
1228 let resp = self
1229 .inner
1230 .list_fragment_distribution(ListFragmentDistributionRequest {
1231 include_node: Some(include_node),
1232 })
1233 .await?;
1234 Ok(resp.distributions)
1235 }
1236
1237 pub async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
1238 let resp = self
1239 .inner
1240 .list_creating_fragment_distribution(ListCreatingFragmentDistributionRequest {
1241 include_node: Some(true),
1242 })
1243 .await?;
1244 Ok(resp.distributions)
1245 }
1246
1247 pub async fn get_fragment_by_id(
1248 &self,
1249 fragment_id: FragmentId,
1250 ) -> Result<Option<FragmentDistribution>> {
1251 let resp = self
1252 .inner
1253 .get_fragment_by_id(GetFragmentByIdRequest { fragment_id })
1254 .await?;
1255 Ok(resp.distribution)
1256 }
1257
1258 pub async fn get_fragment_vnodes(
1259 &self,
1260 fragment_id: FragmentId,
1261 ) -> Result<Vec<(ActorId, Vec<u32>)>> {
1262 let resp = self
1263 .inner
1264 .get_fragment_vnodes(GetFragmentVnodesRequest { fragment_id })
1265 .await?;
1266 Ok(resp
1267 .actor_vnodes
1268 .into_iter()
1269 .map(|actor| (actor.actor_id, actor.vnode_indices))
1270 .collect())
1271 }
1272
1273 pub async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>> {
1274 let resp = self
1275 .inner
1276 .get_actor_vnodes(GetActorVnodesRequest { actor_id })
1277 .await?;
1278 Ok(resp.vnode_indices)
1279 }
1280
1281 pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1282 let resp = self
1283 .inner
1284 .list_actor_states(ListActorStatesRequest {})
1285 .await?;
1286 Ok(resp.states)
1287 }
1288
1289 pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1290 let resp = self
1291 .inner
1292 .list_actor_splits(ListActorSplitsRequest {})
1293 .await?;
1294
1295 Ok(resp.actor_splits)
1296 }
1297
1298 pub async fn pause(&self) -> Result<PauseResponse> {
1299 let request = PauseRequest {};
1300 let resp = self.inner.pause(request).await?;
1301 Ok(resp)
1302 }
1303
1304 pub async fn resume(&self) -> Result<ResumeResponse> {
1305 let request = ResumeRequest {};
1306 let resp = self.inner.resume(request).await?;
1307 Ok(resp)
1308 }
1309
1310 pub async fn apply_throttle(
1311 &self,
1312 throttle_target: PbThrottleTarget,
1313 throttle_type: risingwave_pb::common::PbThrottleType,
1314 id: u32,
1315 rate: Option<u32>,
1316 ) -> Result<ApplyThrottleResponse> {
1317 let request = ApplyThrottleRequest {
1318 throttle_target: throttle_target as i32,
1319 throttle_type: throttle_type as i32,
1320 id,
1321 rate,
1322 };
1323 let resp = self.inner.apply_throttle(request).await?;
1324 Ok(resp)
1325 }
1326
1327 pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1328 let resp = self
1329 .inner
1330 .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1331 .await?;
1332 Ok(resp.get_status().unwrap())
1333 }
1334
1335 pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1336 let request = GetClusterInfoRequest {};
1337 let resp = self.inner.get_cluster_info(request).await?;
1338 Ok(resp)
1339 }
1340
1341 pub async fn reschedule(
1342 &self,
1343 worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1344 revision: u64,
1345 resolve_no_shuffle_upstream: bool,
1346 ) -> Result<(bool, u64)> {
1347 let request = RescheduleRequest {
1348 revision,
1349 resolve_no_shuffle_upstream,
1350 worker_reschedules,
1351 };
1352 let resp = self.inner.reschedule(request).await?;
1353 Ok((resp.success, resp.revision))
1354 }
1355
1356 pub async fn risectl_get_pinned_versions_summary(
1357 &self,
1358 ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1359 let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1360 self.inner
1361 .rise_ctl_get_pinned_versions_summary(request)
1362 .await
1363 }
1364
1365 pub async fn risectl_get_checkpoint_hummock_version(
1366 &self,
1367 ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1368 let request = RiseCtlGetCheckpointVersionRequest {};
1369 self.inner.rise_ctl_get_checkpoint_version(request).await
1370 }
1371
1372 pub async fn risectl_pause_hummock_version_checkpoint(
1373 &self,
1374 ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1375 let request = RiseCtlPauseVersionCheckpointRequest {};
1376 self.inner.rise_ctl_pause_version_checkpoint(request).await
1377 }
1378
1379 pub async fn risectl_resume_hummock_version_checkpoint(
1380 &self,
1381 ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1382 let request = RiseCtlResumeVersionCheckpointRequest {};
1383 self.inner.rise_ctl_resume_version_checkpoint(request).await
1384 }
1385
1386 pub async fn init_metadata_for_replay(
1387 &self,
1388 tables: Vec<PbTable>,
1389 compaction_groups: Vec<CompactionGroupInfo>,
1390 ) -> Result<()> {
1391 let req = InitMetadataForReplayRequest {
1392 tables,
1393 compaction_groups,
1394 };
1395 let _resp = self.inner.init_metadata_for_replay(req).await?;
1396 Ok(())
1397 }
1398
1399 pub async fn replay_version_delta(
1400 &self,
1401 version_delta: HummockVersionDelta,
1402 ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1403 let req = ReplayVersionDeltaRequest {
1404 version_delta: Some(version_delta.into()),
1405 };
1406 let resp = self.inner.replay_version_delta(req).await?;
1407 Ok((
1408 HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1409 resp.modified_compaction_groups,
1410 ))
1411 }
1412
1413 pub async fn list_version_deltas(
1414 &self,
1415 start_id: HummockVersionId,
1416 num_limit: u32,
1417 committed_epoch_limit: HummockEpoch,
1418 ) -> Result<Vec<HummockVersionDelta>> {
1419 let req = ListVersionDeltasRequest {
1420 start_id,
1421 num_limit,
1422 committed_epoch_limit,
1423 };
1424 Ok(self
1425 .inner
1426 .list_version_deltas(req)
1427 .await?
1428 .version_deltas
1429 .unwrap()
1430 .version_deltas
1431 .iter()
1432 .map(HummockVersionDelta::from_rpc_protobuf)
1433 .collect())
1434 }
1435
1436 pub async fn trigger_compaction_deterministic(
1437 &self,
1438 version_id: HummockVersionId,
1439 compaction_groups: Vec<CompactionGroupId>,
1440 ) -> Result<()> {
1441 let req = TriggerCompactionDeterministicRequest {
1442 version_id,
1443 compaction_groups,
1444 };
1445 self.inner.trigger_compaction_deterministic(req).await?;
1446 Ok(())
1447 }
1448
1449 pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1450 let req = DisableCommitEpochRequest {};
1451 Ok(HummockVersion::from_rpc_protobuf(
1452 &self
1453 .inner
1454 .disable_commit_epoch(req)
1455 .await?
1456 .current_version
1457 .unwrap(),
1458 ))
1459 }
1460
1461 pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1462 let req = GetAssignedCompactTaskNumRequest {};
1463 let resp = self.inner.get_assigned_compact_task_num(req).await?;
1464 Ok(resp.num_tasks as usize)
1465 }
1466
1467 pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1468 let req = RiseCtlListCompactionGroupRequest {};
1469 let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1470 Ok(resp.compaction_groups)
1471 }
1472
1473 pub async fn risectl_update_compaction_config(
1474 &self,
1475 compaction_groups: &[CompactionGroupId],
1476 configs: &[MutableConfig],
1477 ) -> Result<()> {
1478 let req = RiseCtlUpdateCompactionConfigRequest {
1479 compaction_group_ids: compaction_groups.to_vec(),
1480 configs: configs
1481 .iter()
1482 .map(
1483 |c| rise_ctl_update_compaction_config_request::MutableConfig {
1484 mutable_config: Some(c.clone()),
1485 },
1486 )
1487 .collect(),
1488 };
1489 let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1490 Ok(())
1491 }
1492
1493 pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1494 let req = BackupMetaRequest { remarks };
1495 let resp = self.inner.backup_meta(req).await?;
1496 Ok(resp.job_id)
1497 }
1498
1499 pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1500 let req = GetBackupJobStatusRequest { job_id };
1501 let resp = self.inner.get_backup_job_status(req).await?;
1502 Ok((resp.job_status(), resp.message))
1503 }
1504
1505 pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1506 let req = DeleteMetaSnapshotRequest {
1507 snapshot_ids: snapshot_ids.to_vec(),
1508 };
1509 let _resp = self.inner.delete_meta_snapshot(req).await?;
1510 Ok(())
1511 }
1512
1513 pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1514 let req = GetMetaSnapshotManifestRequest {};
1515 let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1516 Ok(resp.manifest.expect("should exist"))
1517 }
1518
1519 pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1520 let req = GetTelemetryInfoRequest {};
1521 let resp = self.inner.get_telemetry_info(req).await?;
1522 Ok(resp)
1523 }
1524
1525 pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1526 let req = GetMetaStoreInfoRequest {};
1527 let resp = self.inner.get_meta_store_info(req).await?;
1528 Ok(resp.meta_store_endpoint)
1529 }
1530
1531 pub async fn alter_sink_props(
1532 &self,
1533 sink_id: SinkId,
1534 changed_props: BTreeMap<String, String>,
1535 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1536 connector_conn_ref: Option<ConnectionId>,
1537 ) -> Result<()> {
1538 let req = AlterConnectorPropsRequest {
1539 object_id: sink_id.as_raw_id(),
1540 changed_props: changed_props.into_iter().collect(),
1541 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1542 connector_conn_ref,
1543 object_type: AlterConnectorPropsObject::Sink as i32,
1544 extra_options: None,
1545 };
1546 let _resp = self.inner.alter_connector_props(req).await?;
1547 Ok(())
1548 }
1549
1550 pub async fn alter_iceberg_table_props(
1551 &self,
1552 table_id: TableId,
1553 sink_id: SinkId,
1554 source_id: SourceId,
1555 changed_props: BTreeMap<String, String>,
1556 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1557 connector_conn_ref: Option<ConnectionId>,
1558 ) -> Result<()> {
1559 let req = AlterConnectorPropsRequest {
1560 object_id: table_id.as_raw_id(),
1561 changed_props: changed_props.into_iter().collect(),
1562 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1563 connector_conn_ref,
1564 object_type: AlterConnectorPropsObject::IcebergTable as i32,
1565 extra_options: Some(ExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds {
1566 sink_id,
1567 source_id,
1568 })),
1569 };
1570 let _resp = self.inner.alter_connector_props(req).await?;
1571 Ok(())
1572 }
1573
1574 pub async fn alter_source_connector_props(
1575 &self,
1576 source_id: SourceId,
1577 changed_props: BTreeMap<String, String>,
1578 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1579 connector_conn_ref: Option<ConnectionId>,
1580 ) -> Result<()> {
1581 let req = AlterConnectorPropsRequest {
1582 object_id: source_id.as_raw_id(),
1583 changed_props: changed_props.into_iter().collect(),
1584 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1585 connector_conn_ref,
1586 object_type: AlterConnectorPropsObject::Source as i32,
1587 extra_options: None,
1588 };
1589 let _resp = self.inner.alter_connector_props(req).await?;
1590 Ok(())
1591 }
1592
1593 pub async fn alter_connection_connector_props(
1594 &self,
1595 connection_id: u32,
1596 changed_props: BTreeMap<String, String>,
1597 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1598 ) -> Result<()> {
1599 let req = AlterConnectorPropsRequest {
1600 object_id: connection_id,
1601 changed_props: changed_props.into_iter().collect(),
1602 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1603 connector_conn_ref: None, object_type: AlterConnectorPropsObject::Connection as i32,
1605 extra_options: None,
1606 };
1607 let _resp = self.inner.alter_connector_props(req).await?;
1608 Ok(())
1609 }
1610
1611 pub async fn alter_source_properties_safe(
1614 &self,
1615 source_id: SourceId,
1616 changed_props: BTreeMap<String, String>,
1617 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1618 reset_splits: bool,
1619 ) -> Result<()> {
1620 let req = AlterSourcePropertiesSafeRequest {
1621 source_id: source_id.as_raw_id(),
1622 changed_props: changed_props.into_iter().collect(),
1623 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1624 options: Some(PropertyUpdateOptions { reset_splits }),
1625 };
1626 let _resp = self.inner.alter_source_properties_safe(req).await?;
1627 Ok(())
1628 }
1629
1630 pub async fn reset_source_splits(&self, source_id: SourceId) -> Result<()> {
1633 let req = ResetSourceSplitsRequest {
1634 source_id: source_id.as_raw_id(),
1635 };
1636 let _resp = self.inner.reset_source_splits(req).await?;
1637 Ok(())
1638 }
1639
1640 pub async fn inject_source_offsets(
1643 &self,
1644 source_id: SourceId,
1645 split_offsets: HashMap<String, String>,
1646 ) -> Result<Vec<String>> {
1647 let req = InjectSourceOffsetsRequest {
1648 source_id: source_id.as_raw_id(),
1649 split_offsets,
1650 };
1651 let resp = self.inner.inject_source_offsets(req).await?;
1652 Ok(resp.applied_split_ids)
1653 }
1654
1655 pub async fn set_system_param(
1656 &self,
1657 param: String,
1658 value: Option<String>,
1659 ) -> Result<Option<SystemParamsReader>> {
1660 let req = SetSystemParamRequest { param, value };
1661 let resp = self.inner.set_system_param(req).await?;
1662 Ok(resp.params.map(SystemParamsReader::from))
1663 }
1664
1665 pub async fn get_session_params(&self) -> Result<String> {
1666 let req = GetSessionParamsRequest {};
1667 let resp = self.inner.get_session_params(req).await?;
1668 Ok(resp.params)
1669 }
1670
1671 pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1672 let req = SetSessionParamRequest { param, value };
1673 let resp = self.inner.set_session_param(req).await?;
1674 Ok(resp.param)
1675 }
1676
1677 pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1678 let req = GetDdlProgressRequest {};
1679 let resp = self.inner.get_ddl_progress(req).await?;
1680 Ok(resp.ddl_progress)
1681 }
1682
1683 pub async fn split_compaction_group(
1684 &self,
1685 group_id: CompactionGroupId,
1686 table_ids_to_new_group: &[TableId],
1687 partition_vnode_count: u32,
1688 ) -> Result<CompactionGroupId> {
1689 let req = SplitCompactionGroupRequest {
1690 group_id,
1691 table_ids: table_ids_to_new_group.to_vec(),
1692 partition_vnode_count,
1693 };
1694 let resp = self.inner.split_compaction_group(req).await?;
1695 Ok(resp.new_group_id)
1696 }
1697
1698 pub async fn get_tables(
1699 &self,
1700 table_ids: Vec<TableId>,
1701 include_dropped_tables: bool,
1702 ) -> Result<HashMap<TableId, Table>> {
1703 let req = GetTablesRequest {
1704 table_ids,
1705 include_dropped_tables,
1706 };
1707 let resp = self.inner.get_tables(req).await?;
1708 Ok(resp.tables)
1709 }
1710
1711 pub async fn list_serving_vnode_mappings(
1712 &self,
1713 ) -> Result<HashMap<FragmentId, (JobId, WorkerSlotMapping)>> {
1714 let req = GetServingVnodeMappingsRequest {};
1715 let resp = self.inner.get_serving_vnode_mappings(req).await?;
1716 let mappings = resp
1717 .worker_slot_mappings
1718 .into_iter()
1719 .map(|p| {
1720 (
1721 p.fragment_id,
1722 (
1723 resp.fragment_to_table
1724 .get(&p.fragment_id)
1725 .cloned()
1726 .unwrap_or_default(),
1727 WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1728 ),
1729 )
1730 })
1731 .collect();
1732 Ok(mappings)
1733 }
1734
1735 pub async fn risectl_list_compaction_status(
1736 &self,
1737 ) -> Result<(
1738 Vec<CompactStatus>,
1739 Vec<CompactTaskAssignment>,
1740 Vec<CompactTaskProgress>,
1741 )> {
1742 let req = RiseCtlListCompactionStatusRequest {};
1743 let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1744 Ok((
1745 resp.compaction_statuses,
1746 resp.task_assignment,
1747 resp.task_progress,
1748 ))
1749 }
1750
1751 pub async fn get_compaction_score(
1752 &self,
1753 compaction_group_id: CompactionGroupId,
1754 ) -> Result<Vec<PickerInfo>> {
1755 let req = GetCompactionScoreRequest {
1756 compaction_group_id,
1757 };
1758 let resp = self.inner.get_compaction_score(req).await?;
1759 Ok(resp.scores)
1760 }
1761
1762 pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1763 let req = RiseCtlRebuildTableStatsRequest {};
1764 let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1765 Ok(())
1766 }
1767
1768 pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1769 let req = ListBranchedObjectRequest {};
1770 let resp = self.inner.list_branched_object(req).await?;
1771 Ok(resp.branched_objects)
1772 }
1773
1774 pub async fn list_active_write_limit(&self) -> Result<HashMap<CompactionGroupId, WriteLimit>> {
1775 let req = ListActiveWriteLimitRequest {};
1776 let resp = self.inner.list_active_write_limit(req).await?;
1777 Ok(resp.write_limits)
1778 }
1779
1780 pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1781 let req = ListHummockMetaConfigRequest {};
1782 let resp = self.inner.list_hummock_meta_config(req).await?;
1783 Ok(resp.configs)
1784 }
1785
1786 pub async fn get_table_change_logs(
1787 &self,
1788 epoch_only: bool,
1789 start_epoch_inclusive: Option<u64>,
1790 end_epoch_inclusive: Option<u64>,
1791 table_ids: Option<HashSet<TableId>>,
1792 exclude_empty: bool,
1793 limit: Option<u32>,
1794 ) -> Result<TableChangeLogs> {
1795 let req = GetTableChangeLogsRequest {
1796 epoch_only,
1797 start_epoch_inclusive,
1798 end_epoch_inclusive,
1799 table_ids: table_ids.map(|iter| PbTableFilter {
1800 table_ids: iter.into_iter().collect::<Vec<_>>(),
1801 }),
1802 exclude_empty,
1803 limit,
1804 };
1805 let resp = self.inner.get_table_change_logs(req).await?;
1806 Ok(resp
1807 .table_change_logs
1808 .into_iter()
1809 .map(|(id, change_log)| (TableId::new(id), TableChangeLog::from_protobuf(&change_log)))
1810 .collect())
1811 }
1812
1813 pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1814 let _resp = self
1815 .inner
1816 .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1817 .await?;
1818
1819 Ok(())
1820 }
1821
1822 pub async fn rw_cloud_validate_source(
1823 &self,
1824 source_type: SourceType,
1825 source_config: HashMap<String, String>,
1826 ) -> Result<RwCloudValidateSourceResponse> {
1827 let req = RwCloudValidateSourceRequest {
1828 source_type: source_type.into(),
1829 source_config,
1830 };
1831 let resp = self.inner.rw_cloud_validate_source(req).await?;
1832 Ok(resp)
1833 }
1834
1835 pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1836 self.inner.core.read().await.sink_coordinate_client.clone()
1837 }
1838
1839 pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1840 let req = ListCompactTaskAssignmentRequest {};
1841 let resp = self.inner.list_compact_task_assignment(req).await?;
1842 Ok(resp.task_assignment)
1843 }
1844
1845 pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1846 let req = ListEventLogRequest::default();
1847 let resp = self.inner.list_event_log(req).await?;
1848 Ok(resp.event_logs)
1849 }
1850
1851 pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1852 let req = ListCompactTaskProgressRequest {};
1853 let resp = self.inner.list_compact_task_progress(req).await?;
1854 Ok(resp.task_progress)
1855 }
1856
1857 #[cfg(madsim)]
1858 pub fn try_add_panic_event_blocking(
1859 &self,
1860 panic_info: impl Display,
1861 timeout_millis: Option<u64>,
1862 ) {
1863 }
1864
1865 #[cfg(not(madsim))]
1867 pub fn try_add_panic_event_blocking(
1868 &self,
1869 panic_info: impl Display,
1870 timeout_millis: Option<u64>,
1871 ) {
1872 let event = event_log::EventWorkerNodePanic {
1873 worker_id: self.worker_id,
1874 worker_type: self.worker_type.into(),
1875 host_addr: Some(self.host_addr.to_protobuf()),
1876 panic_info: format!("{panic_info}"),
1877 };
1878 let grpc_meta_client = self.inner.clone();
1879 let _ = thread::spawn(move || {
1880 let rt = tokio::runtime::Builder::new_current_thread()
1881 .enable_all()
1882 .build()
1883 .unwrap();
1884 let req = AddEventLogRequest {
1885 event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1886 };
1887 rt.block_on(async {
1888 let _ = tokio::time::timeout(
1889 Duration::from_millis(timeout_millis.unwrap_or(1000)),
1890 grpc_meta_client.add_event_log(req),
1891 )
1892 .await;
1893 });
1894 })
1895 .join();
1896 }
1897
1898 pub async fn add_sink_fail_evet(
1899 &self,
1900 sink_id: SinkId,
1901 sink_name: String,
1902 connector: String,
1903 error: String,
1904 ) -> Result<()> {
1905 let event = event_log::EventSinkFail {
1906 sink_id,
1907 sink_name,
1908 connector,
1909 error,
1910 };
1911 let req = AddEventLogRequest {
1912 event: Some(add_event_log_request::Event::SinkFail(event)),
1913 };
1914 self.inner.add_event_log(req).await?;
1915 Ok(())
1916 }
1917
1918 pub async fn add_cdc_auto_schema_change_fail_event(
1919 &self,
1920 source_id: SourceId,
1921 table_name: String,
1922 cdc_table_id: String,
1923 upstream_ddl: String,
1924 fail_info: String,
1925 ) -> Result<()> {
1926 let event = event_log::EventAutoSchemaChangeFail {
1927 table_id: source_id.as_cdc_table_id(),
1928 table_name,
1929 cdc_table_id,
1930 upstream_ddl,
1931 fail_info,
1932 };
1933 let req = AddEventLogRequest {
1934 event: Some(add_event_log_request::Event::AutoSchemaChangeFail(event)),
1935 };
1936 self.inner.add_event_log(req).await?;
1937 Ok(())
1938 }
1939
1940 pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1941 let req = CancelCompactTaskRequest {
1942 task_id,
1943 task_status: task_status as _,
1944 };
1945 let resp = self.inner.cancel_compact_task(req).await?;
1946 Ok(resp.ret)
1947 }
1948
1949 pub async fn get_version_by_epoch(
1950 &self,
1951 epoch: HummockEpoch,
1952 table_id: TableId,
1953 ) -> Result<PbHummockVersion> {
1954 let req = GetVersionByEpochRequest { epoch, table_id };
1955 let resp = self.inner.get_version_by_epoch(req).await?;
1956 Ok(resp.version.unwrap())
1957 }
1958
1959 pub async fn get_cluster_limits(
1960 &self,
1961 ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1962 let req = GetClusterLimitsRequest {};
1963 let resp = self.inner.get_cluster_limits(req).await?;
1964 Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1965 }
1966
1967 pub async fn merge_compaction_group(
1968 &self,
1969 left_group_id: CompactionGroupId,
1970 right_group_id: CompactionGroupId,
1971 ) -> Result<()> {
1972 let req = MergeCompactionGroupRequest {
1973 left_group_id,
1974 right_group_id,
1975 };
1976 self.inner.merge_compaction_group(req).await?;
1977 Ok(())
1978 }
1979
1980 pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1982 let request = ListRateLimitsRequest {};
1983 let resp = self.inner.list_rate_limits(request).await?;
1984 Ok(resp.rate_limits)
1985 }
1986
1987 pub async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
1988 let request = ListCdcProgressRequest {};
1989 let resp = self.inner.list_cdc_progress(request).await?;
1990 Ok(resp.cdc_progress)
1991 }
1992
1993 pub async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>> {
1994 let request = ListRefreshTableStatesRequest {};
1995 let resp = self.inner.list_refresh_table_states(request).await?;
1996 Ok(resp.states)
1997 }
1998
1999 pub async fn list_iceberg_compaction_status(
2000 &self,
2001 ) -> Result<Vec<list_iceberg_compaction_status_response::IcebergCompactionStatus>> {
2002 let request = ListIcebergCompactionStatusRequest {};
2003 let resp = self.inner.list_iceberg_compaction_status(request).await?;
2004 Ok(resp.statuses)
2005 }
2006
2007 pub async fn list_sink_log_store_tables(
2008 &self,
2009 ) -> Result<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
2010 let request = ListSinkLogStoreTablesRequest {};
2011 let resp = self.inner.list_sink_log_store_tables(request).await?;
2012 Ok(resp.tables)
2013 }
2014
2015 pub async fn create_iceberg_table(
2016 &self,
2017 table_job_info: PbTableJobInfo,
2018 sink_job_info: PbSinkJobInfo,
2019 iceberg_source: PbSource,
2020 if_not_exists: bool,
2021 ) -> Result<WaitVersion> {
2022 let request = CreateIcebergTableRequest {
2023 table_info: Some(table_job_info),
2024 sink_info: Some(sink_job_info),
2025 iceberg_source: Some(iceberg_source),
2026 if_not_exists,
2027 };
2028
2029 let resp = Box::pin(self.inner.create_iceberg_table(request)).await?;
2030 Ok(resp
2031 .version
2032 .ok_or_else(|| anyhow!("wait version not set"))?)
2033 }
2034
2035 pub async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
2036 let request = ListIcebergTablesRequest {};
2037 let resp = self.inner.list_iceberg_tables(request).await?;
2038 Ok(resp.iceberg_tables)
2039 }
2040
2041 pub async fn get_cluster_stack_trace(
2043 &self,
2044 actor_traces_format: ActorTracesFormat,
2045 ) -> Result<StackTraceResponse> {
2046 let request = StackTraceRequest {
2047 actor_traces_format: actor_traces_format as i32,
2048 };
2049 let resp = self.inner.stack_trace(request).await?;
2050 Ok(resp)
2051 }
2052
2053 pub async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
2054 let request = SetSyncLogStoreAlignedRequest { job_id, aligned };
2055 self.inner.set_sync_log_store_aligned(request).await?;
2056 Ok(())
2057 }
2058
2059 pub async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
2060 self.inner.refresh(request).await
2061 }
2062
2063 pub async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
2064 let request = ListUnmigratedTablesRequest {};
2065 let resp = self.inner.list_unmigrated_tables(request).await?;
2066 Ok(resp
2067 .tables
2068 .into_iter()
2069 .map(|table| (table.table_id, table.table_name))
2070 .collect())
2071 }
2072}
2073
2074#[async_trait]
2075impl HummockMetaClient for MetaClient {
2076 async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
2077 let req = UnpinVersionBeforeRequest {
2078 context_id: self.worker_id(),
2079 unpin_version_before,
2080 };
2081 self.inner.unpin_version_before(req).await?;
2082 Ok(())
2083 }
2084
2085 async fn get_current_version(&self) -> Result<HummockVersion> {
2086 let req = GetCurrentVersionRequest::default();
2087 Ok(HummockVersion::from_rpc_protobuf(
2088 &self
2089 .inner
2090 .get_current_version(req)
2091 .await?
2092 .current_version
2093 .unwrap(),
2094 ))
2095 }
2096
2097 async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
2098 let resp = self
2099 .inner
2100 .get_new_object_ids(GetNewObjectIdsRequest { number })
2101 .await?;
2102 Ok(ObjectIdRange::new(resp.start_id, resp.end_id))
2103 }
2104
2105 async fn commit_epoch_with_change_log(
2106 &self,
2107 _epoch: HummockEpoch,
2108 _sync_result: SyncResult,
2109 _change_log_info: Option<HummockMetaClientChangeLogInfo>,
2110 ) -> Result<()> {
2111 panic!("Only meta service can commit_epoch in production.")
2112 }
2113
2114 async fn trigger_manual_compaction(
2115 &self,
2116 compaction_group_id: CompactionGroupId,
2117 table_id: JobId,
2118 level: u32,
2119 target_level: Option<u32>,
2120 sst_ids: Vec<HummockSstableId>,
2121 exclusive: bool,
2122 ) -> Result<bool> {
2123 let req = TriggerManualCompactionRequest {
2125 compaction_group_id,
2126 table_id,
2127 level,
2130 target_level,
2131 sst_ids,
2132 exclusive: Some(exclusive),
2133 ..Default::default()
2134 };
2135
2136 let resp = self.inner.trigger_manual_compaction(req).await?;
2137 Ok(resp.should_retry.unwrap_or(false))
2138 }
2139
2140 async fn trigger_full_gc(
2141 &self,
2142 sst_retention_time_sec: u64,
2143 prefix: Option<String>,
2144 ) -> Result<()> {
2145 self.inner
2146 .trigger_full_gc(TriggerFullGcRequest {
2147 sst_retention_time_sec,
2148 prefix,
2149 })
2150 .await?;
2151 Ok(())
2152 }
2153
2154 async fn subscribe_compaction_event(
2155 &self,
2156 ) -> Result<(
2157 UnboundedSender<SubscribeCompactionEventRequest>,
2158 BoxStream<'static, CompactionEventItem>,
2159 )> {
2160 let (request_sender, request_receiver) =
2161 unbounded_channel::<SubscribeCompactionEventRequest>();
2162 request_sender
2163 .send(SubscribeCompactionEventRequest {
2164 event: Some(subscribe_compaction_event_request::Event::Register(
2165 Register {
2166 context_id: self.worker_id(),
2167 },
2168 )),
2169 create_at: SystemTime::now()
2170 .duration_since(SystemTime::UNIX_EPOCH)
2171 .expect("Clock may have gone backwards")
2172 .as_millis() as u64,
2173 })
2174 .context("Failed to subscribe compaction event")?;
2175
2176 let stream = self
2177 .inner
2178 .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
2179 request_receiver,
2180 )))
2181 .await?;
2182
2183 Ok((request_sender, Box::pin(stream)))
2184 }
2185
2186 async fn get_version_by_epoch(
2187 &self,
2188 epoch: HummockEpoch,
2189 table_id: TableId,
2190 ) -> Result<PbHummockVersion> {
2191 self.get_version_by_epoch(epoch, table_id).await
2192 }
2193
2194 async fn subscribe_iceberg_compaction_event(
2195 &self,
2196 ) -> Result<(
2197 UnboundedSender<SubscribeIcebergCompactionEventRequest>,
2198 BoxStream<'static, IcebergCompactionEventItem>,
2199 )> {
2200 let (request_sender, request_receiver) =
2201 unbounded_channel::<SubscribeIcebergCompactionEventRequest>();
2202 request_sender
2203 .send(SubscribeIcebergCompactionEventRequest {
2204 event: Some(subscribe_iceberg_compaction_event_request::Event::Register(
2205 IcebergRegister {
2206 context_id: self.worker_id(),
2207 },
2208 )),
2209 create_at: SystemTime::now()
2210 .duration_since(SystemTime::UNIX_EPOCH)
2211 .expect("Clock may have gone backwards")
2212 .as_millis() as u64,
2213 })
2214 .context("Failed to subscribe compaction event")?;
2215
2216 let stream = self
2217 .inner
2218 .subscribe_iceberg_compaction_event(Request::new(UnboundedReceiverStream::new(
2219 request_receiver,
2220 )))
2221 .await?;
2222
2223 Ok((request_sender, Box::pin(stream)))
2224 }
2225
2226 async fn get_table_change_logs(
2227 &self,
2228 epoch_only: bool,
2229 start_epoch_inclusive: Option<u64>,
2230 end_epoch_inclusive: Option<u64>,
2231 table_ids: Option<HashSet<TableId>>,
2232 exclude_empty: bool,
2233 limit: Option<u32>,
2234 ) -> Result<TableChangeLogs> {
2235 self.get_table_change_logs(
2236 epoch_only,
2237 start_epoch_inclusive,
2238 end_epoch_inclusive,
2239 table_ids,
2240 exclude_empty,
2241 limit,
2242 )
2243 .await
2244 }
2245}
2246
2247#[async_trait]
2248impl TelemetryInfoFetcher for MetaClient {
2249 async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
2250 let resp = self
2251 .get_telemetry_info()
2252 .await
2253 .map_err(|e| e.to_report_string())?;
2254 let tracking_id = resp.get_tracking_id().ok();
2255 Ok(tracking_id.map(|id| id.to_owned()))
2256 }
2257}
2258
2259pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
2260
2261#[derive(Debug, Clone)]
2262struct GrpcMetaClientCore {
2263 cluster_client: ClusterServiceClient<Channel>,
2264 meta_member_client: MetaMemberServiceClient<Channel>,
2265 heartbeat_client: HeartbeatServiceClient<Channel>,
2266 ddl_client: DdlServiceClient<Channel>,
2267 hummock_client: HummockManagerServiceClient<Channel>,
2268 notification_client: NotificationServiceClient<Channel>,
2269 stream_client: StreamManagerServiceClient<Channel>,
2270 user_client: UserServiceClient<Channel>,
2271 scale_client: ScaleServiceClient<Channel>,
2272 backup_client: BackupServiceClient<Channel>,
2273 telemetry_client: TelemetryInfoServiceClient<Channel>,
2274 system_params_client: SystemParamsServiceClient<Channel>,
2275 session_params_client: SessionParamServiceClient<Channel>,
2276 serving_client: ServingServiceClient<Channel>,
2277 cloud_client: CloudServiceClient<Channel>,
2278 sink_coordinate_client: SinkCoordinationRpcClient,
2279 event_log_client: EventLogServiceClient<Channel>,
2280 cluster_limit_client: ClusterLimitServiceClient<Channel>,
2281 hosted_iceberg_catalog_service_client: HostedIcebergCatalogServiceClient<Channel>,
2282 monitor_client: MonitorServiceClient<Channel>,
2283}
2284
2285impl GrpcMetaClientCore {
2286 pub(crate) fn new(channel: Channel) -> Self {
2287 let cluster_client = ClusterServiceClient::new(channel.clone());
2288 let meta_member_client = MetaMemberClient::new(channel.clone());
2289 let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
2290 let ddl_client =
2291 DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2292 let hummock_client =
2293 HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2294 let notification_client =
2295 NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2296 let stream_client =
2297 StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2298 let user_client = UserServiceClient::new(channel.clone());
2299 let scale_client =
2300 ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2301 let backup_client = BackupServiceClient::new(channel.clone());
2302 let telemetry_client =
2303 TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2304 let system_params_client = SystemParamsServiceClient::new(channel.clone());
2305 let session_params_client = SessionParamServiceClient::new(channel.clone());
2306 let serving_client = ServingServiceClient::new(channel.clone());
2307 let cloud_client = CloudServiceClient::new(channel.clone());
2308 let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone())
2309 .max_decoding_message_size(usize::MAX);
2310 let event_log_client = EventLogServiceClient::new(channel.clone());
2311 let cluster_limit_client = ClusterLimitServiceClient::new(channel.clone());
2312 let hosted_iceberg_catalog_service_client =
2313 HostedIcebergCatalogServiceClient::new(channel.clone());
2314 let monitor_client = configured_monitor_service_client(MonitorServiceClient::new(channel));
2315
2316 GrpcMetaClientCore {
2317 cluster_client,
2318 meta_member_client,
2319 heartbeat_client,
2320 ddl_client,
2321 hummock_client,
2322 notification_client,
2323 stream_client,
2324 user_client,
2325 scale_client,
2326 backup_client,
2327 telemetry_client,
2328 system_params_client,
2329 session_params_client,
2330 serving_client,
2331 cloud_client,
2332 sink_coordinate_client,
2333 event_log_client,
2334 cluster_limit_client,
2335 hosted_iceberg_catalog_service_client,
2336 monitor_client,
2337 }
2338 }
2339}
2340
2341#[derive(Debug, Clone)]
2345struct GrpcMetaClient {
2346 member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
2347 core: Arc<RwLock<GrpcMetaClientCore>>,
2348}
2349
2350type MetaMemberClient = MetaMemberServiceClient<Channel>;
2351
2352struct MetaMemberGroup {
2353 members: LruCache<http::Uri, Option<MetaMemberClient>>,
2354}
2355
2356struct MetaMemberManagement {
2357 core_ref: Arc<RwLock<GrpcMetaClientCore>>,
2358 members: Either<MetaMemberClient, MetaMemberGroup>,
2359 current_leader: http::Uri,
2360 meta_config: Arc<MetaConfig>,
2361}
2362
2363impl MetaMemberManagement {
2364 const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
2365
2366 fn host_address_to_uri(addr: HostAddress) -> http::Uri {
2367 format!("http://{}:{}", addr.host, addr.port)
2368 .parse()
2369 .unwrap()
2370 }
2371
2372 async fn recreate_core(&self, channel: Channel) {
2373 let mut core = self.core_ref.write().await;
2374 *core = GrpcMetaClientCore::new(channel);
2375 }
2376
2377 async fn refresh_members(&mut self) -> Result<()> {
2378 let leader_addr = match self.members.as_mut() {
2379 Either::Left(client) => {
2380 let resp = client
2381 .to_owned()
2382 .members(MembersRequest {})
2383 .await
2384 .map_err(RpcError::from_meta_status)?;
2385 let resp = resp.into_inner();
2386 resp.members.into_iter().find(|member| member.is_leader)
2387 }
2388 Either::Right(member_group) => {
2389 let mut fetched_members = None;
2390
2391 for (addr, client) in &mut member_group.members {
2392 let members: Result<_> = try {
2393 let mut client = match client {
2394 Some(cached_client) => cached_client.to_owned(),
2395 None => {
2396 let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
2397 let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
2398 .await
2399 .context("failed to create client")
2400 .map_err(RpcError::from)?;
2401 let new_client: MetaMemberClient =
2402 MetaMemberServiceClient::new(channel);
2403 *client = Some(new_client.clone());
2404
2405 new_client
2406 }
2407 };
2408
2409 let resp = client
2410 .members(MembersRequest {})
2411 .await
2412 .context("failed to fetch members")
2413 .map_err(RpcError::from)?;
2414
2415 resp.into_inner().members
2416 };
2417
2418 let fetched = members.is_ok();
2419 fetched_members = Some(members);
2420 if fetched {
2421 break;
2422 }
2423 }
2424
2425 let members = fetched_members
2426 .context("no member available in the list")?
2427 .context("could not refresh members")?;
2428
2429 let mut leader = None;
2431 for member in members {
2432 if member.is_leader {
2433 leader = Some(member.clone());
2434 }
2435
2436 let addr = Self::host_address_to_uri(member.address.unwrap());
2437 if !member_group.members.contains(&addr) {
2439 tracing::info!("new meta member joined: {}", addr);
2440 member_group.members.put(addr, None);
2441 }
2442 }
2443
2444 leader
2445 }
2446 };
2447
2448 if let Some(leader) = leader_addr {
2449 let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
2450
2451 if discovered_leader != self.current_leader {
2452 tracing::info!("new meta leader {} discovered", discovered_leader);
2453
2454 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
2455 Duration::from_secs(self.meta_config.meta_leader_lease_secs),
2456 false,
2457 );
2458
2459 let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
2460 let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
2461 GrpcMetaClient::connect_to_endpoint(endpoint).await
2462 })
2463 .await?;
2464
2465 self.recreate_core(channel).await;
2466 self.current_leader = discovered_leader;
2467 }
2468 }
2469
2470 Ok(())
2471 }
2472}
2473
2474impl GrpcMetaClient {
2475 const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
2477 const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
2479 const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
2481 const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
2483
2484 fn start_meta_member_monitor(
2485 &self,
2486 init_leader_addr: http::Uri,
2487 members: Either<MetaMemberClient, MetaMemberGroup>,
2488 force_refresh_receiver: Receiver<Sender<Result<()>>>,
2489 meta_config: Arc<MetaConfig>,
2490 ) -> Result<()> {
2491 let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
2492 let current_leader = init_leader_addr;
2493
2494 let enable_period_tick = matches!(members, Either::Right(_));
2495
2496 let member_management = MetaMemberManagement {
2497 core_ref,
2498 members,
2499 current_leader,
2500 meta_config,
2501 };
2502
2503 let mut force_refresh_receiver = force_refresh_receiver;
2504
2505 tokio::spawn(async move {
2506 let mut member_management = member_management;
2507 let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
2508
2509 loop {
2510 let event: Option<Sender<Result<()>>> = if enable_period_tick {
2511 tokio::select! {
2512 _ = ticker.tick() => None,
2513 result_sender = force_refresh_receiver.recv() => {
2514 if result_sender.is_none() {
2515 break;
2516 }
2517
2518 result_sender
2519 },
2520 }
2521 } else {
2522 let result_sender = force_refresh_receiver.recv().await;
2523
2524 if result_sender.is_none() {
2525 break;
2526 }
2527
2528 result_sender
2529 };
2530
2531 let tick_result = member_management.refresh_members().await;
2532 if let Err(e) = tick_result.as_ref() {
2533 tracing::warn!(error = %e.as_report(), "refresh meta member client failed");
2534 }
2535
2536 if let Some(sender) = event {
2537 let _resp = sender.send(tick_result);
2539 }
2540 }
2541 });
2542
2543 Ok(())
2544 }
2545
2546 async fn force_refresh_leader(&self) -> Result<()> {
2547 let (sender, receiver) = oneshot::channel();
2548
2549 self.member_monitor_event_sender
2550 .send(sender)
2551 .await
2552 .map_err(|e| anyhow!(e))?;
2553
2554 receiver.await.map_err(|e| anyhow!(e))?
2555 }
2556
2557 pub async fn new(strategy: &MetaAddressStrategy, config: Arc<MetaConfig>) -> Result<Self> {
2559 let (channel, addr) = match strategy {
2560 MetaAddressStrategy::LoadBalance(addr) => {
2561 Self::try_build_rpc_channel(vec![addr.clone()]).await
2562 }
2563 MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
2564 }?;
2565 let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
2566 let client = GrpcMetaClient {
2567 member_monitor_event_sender: force_refresh_sender,
2568 core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
2569 };
2570
2571 let meta_member_client = client.core.read().await.meta_member_client.clone();
2572 let members = match strategy {
2573 MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
2574 MetaAddressStrategy::List(addrs) => {
2575 let mut members = LruCache::new(20.try_into().unwrap());
2576 for addr in addrs {
2577 members.put(addr.clone(), None);
2578 }
2579 members.put(addr.clone(), Some(meta_member_client));
2580
2581 Either::Right(MetaMemberGroup { members })
2582 }
2583 };
2584
2585 client.start_meta_member_monitor(addr, members, force_refresh_receiver, config.clone())?;
2586
2587 client.force_refresh_leader().await?;
2588
2589 Ok(client)
2590 }
2591
2592 fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2593 Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2594 }
2595
2596 pub(crate) async fn try_build_rpc_channel(
2597 addrs: impl IntoIterator<Item = http::Uri>,
2598 ) -> Result<(Channel, http::Uri)> {
2599 let endpoints: Vec<_> = addrs
2600 .into_iter()
2601 .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2602 .collect();
2603
2604 let mut last_error = None;
2605
2606 for (endpoint, addr) in endpoints {
2607 match Self::connect_to_endpoint(endpoint).await {
2608 Ok(channel) => {
2609 tracing::info!("Connect to meta server {} successfully", addr);
2610 return Ok((channel, addr));
2611 }
2612 Err(e) => {
2613 tracing::warn!(
2614 error = %e.as_report(),
2615 "Failed to connect to meta server {}, trying again",
2616 addr,
2617 );
2618 last_error = Some(e);
2619 }
2620 }
2621 }
2622
2623 if let Some(last_error) = last_error {
2624 Err(anyhow::anyhow!(last_error)
2625 .context("failed to connect to all meta servers")
2626 .into())
2627 } else {
2628 bail!("no meta server address provided")
2629 }
2630 }
2631
2632 async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2633 let channel = endpoint
2634 .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2635 .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2636 .connect_timeout(Duration::from_secs(5))
2637 .monitored_connect("grpc-meta-client", Default::default())
2638 .await?
2639 .wrapped();
2640
2641 Ok(channel)
2642 }
2643
2644 pub(crate) fn retry_strategy_to_bound(
2645 high_bound: Duration,
2646 exceed: bool,
2647 ) -> impl Iterator<Item = Duration> {
2648 let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2649 .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2650 .map(jitter);
2651
2652 let mut sum = Duration::default();
2653
2654 iter.take_while(move |duration| {
2655 sum += *duration;
2656
2657 if exceed {
2658 sum < high_bound + *duration
2659 } else {
2660 sum < high_bound
2661 }
2662 })
2663 }
2664}
2665
2666macro_rules! for_all_meta_rpc {
2667 ($macro:ident) => {
2668 $macro! {
2669 { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2670 ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2671 ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2672 ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2673 ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2674 ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2675 ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2676 ,{ stream_client, flush, FlushRequest, FlushResponse }
2677 ,{ stream_client, pause, PauseRequest, PauseResponse }
2678 ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2679 ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2680 ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2681 ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2682 ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2683 ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2684 ,{ stream_client, list_creating_fragment_distribution, ListCreatingFragmentDistributionRequest, ListCreatingFragmentDistributionResponse }
2685 ,{ stream_client, list_sink_log_store_tables, ListSinkLogStoreTablesRequest, ListSinkLogStoreTablesResponse }
2686 ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2687 ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2688 ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2689 ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2690 ,{ stream_client, list_cdc_progress, ListCdcProgressRequest, ListCdcProgressResponse }
2691 ,{ stream_client, list_refresh_table_states, ListRefreshTableStatesRequest, ListRefreshTableStatesResponse }
2692 ,{ stream_client, list_iceberg_compaction_status, ListIcebergCompactionStatusRequest, ListIcebergCompactionStatusResponse }
2693 ,{ stream_client, alter_connector_props, AlterConnectorPropsRequest, AlterConnectorPropsResponse }
2694 ,{ stream_client, alter_source_properties_safe, AlterSourcePropertiesSafeRequest, AlterSourcePropertiesSafeResponse }
2695 ,{ stream_client, reset_source_splits, ResetSourceSplitsRequest, ResetSourceSplitsResponse }
2696 ,{ stream_client, inject_source_offsets, InjectSourceOffsetsRequest, InjectSourceOffsetsResponse }
2697 ,{ stream_client, get_fragment_by_id, GetFragmentByIdRequest, GetFragmentByIdResponse }
2698 ,{ stream_client, get_fragment_vnodes, GetFragmentVnodesRequest, GetFragmentVnodesResponse }
2699 ,{ stream_client, get_actor_vnodes, GetActorVnodesRequest, GetActorVnodesResponse }
2700 ,{ stream_client, set_sync_log_store_aligned, SetSyncLogStoreAlignedRequest, SetSyncLogStoreAlignedResponse }
2701 ,{ stream_client, refresh, RefreshRequest, RefreshResponse }
2702 ,{ stream_client, list_unmigrated_tables, ListUnmigratedTablesRequest, ListUnmigratedTablesResponse }
2703 ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2704 ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2705 ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2706 ,{ ddl_client, alter_subscription_retention, AlterSubscriptionRetentionRequest, AlterSubscriptionRetentionResponse }
2707 ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2708 ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2709 ,{ ddl_client, alter_backfill_parallelism, AlterBackfillParallelismRequest, AlterBackfillParallelismResponse }
2710 ,{ ddl_client, alter_streaming_job_config, AlterStreamingJobConfigRequest, AlterStreamingJobConfigResponse }
2711 ,{ ddl_client, alter_fragment_parallelism, AlterFragmentParallelismRequest, AlterFragmentParallelismResponse }
2712 ,{ ddl_client, alter_cdc_table_backfill_parallelism, AlterCdcTableBackfillParallelismRequest, AlterCdcTableBackfillParallelismResponse }
2713 ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2714 ,{ ddl_client, alter_database_resource_group, AlterDatabaseResourceGroupRequest, AlterDatabaseResourceGroupResponse }
2715 ,{ ddl_client, alter_database_param, AlterDatabaseParamRequest, AlterDatabaseParamResponse }
2716 ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2717 ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2718 ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2719 ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2720 ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2721 ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2722 ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2723 ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2724 ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2725 ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2726 ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2727 ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2728 ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2729 ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2730 ,{ ddl_client, reset_source, ResetSourceRequest, ResetSourceResponse }
2731 ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2732 ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2733 ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2734 ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2735 ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2736 ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2737 ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2738 ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2739 ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2740 ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2741 ,{ ddl_client, risectl_resume_backfill, RisectlResumeBackfillRequest, RisectlResumeBackfillResponse }
2742 ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2743 ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2744 ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2745 ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2746 ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2747 ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2748 ,{ ddl_client, wait, WaitRequest, WaitResponse }
2749 ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2750 ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2751 ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2752 ,{ ddl_client, compact_iceberg_table, CompactIcebergTableRequest, CompactIcebergTableResponse }
2753 ,{ ddl_client, expire_iceberg_table_snapshots, ExpireIcebergTableSnapshotsRequest, ExpireIcebergTableSnapshotsResponse }
2754 ,{ ddl_client, create_iceberg_table, CreateIcebergTableRequest, CreateIcebergTableResponse }
2755 ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2756 ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2757 ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2758 ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2759 ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2760 ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2761 ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2762 ,{ hummock_client, get_new_object_ids, GetNewObjectIdsRequest, GetNewObjectIdsResponse }
2763 ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2764 ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2765 ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2766 ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2767 ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2768 ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2769 ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2770 ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2771 ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2772 ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2773 ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2774 ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2775 ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2776 ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2777 ,{ hummock_client, subscribe_iceberg_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeIcebergCompactionEventRequest>, Streaming<SubscribeIcebergCompactionEventResponse> }
2778 ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2779 ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2780 ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2781 ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2782 ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2783 ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2784 ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2785 ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2786 ,{ hummock_client, get_table_change_logs, GetTableChangeLogsRequest, GetTableChangeLogsResponse }
2787 ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2788 ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2789 ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2790 ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2791 ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2792 ,{ user_client, alter_default_privilege, AlterDefaultPrivilegeRequest, AlterDefaultPrivilegeResponse }
2793 ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2794 ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2795 ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2796 ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2797 ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2798 ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2799 ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2800 ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2801 ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2802 ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2803 ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2804 ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2805 ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2806 ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2807 ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2808 ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2809 ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2810 ,{ hosted_iceberg_catalog_service_client, list_iceberg_tables, ListIcebergTablesRequest, ListIcebergTablesResponse }
2811 ,{ monitor_client, stack_trace, StackTraceRequest, StackTraceResponse }
2812 }
2813 };
2814}
2815
2816impl GrpcMetaClient {
2817 async fn refresh_client_if_needed(&self, code: Code) {
2818 if matches!(
2819 code,
2820 Code::Unknown | Code::Unimplemented | Code::Unavailable
2821 ) {
2822 tracing::debug!("matching tonic code {}", code);
2823 let (result_sender, result_receiver) = oneshot::channel();
2824 if self
2825 .member_monitor_event_sender
2826 .try_send(result_sender)
2827 .is_ok()
2828 {
2829 if let Ok(Err(e)) = result_receiver.await {
2830 tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2831 }
2832 } else {
2833 tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2834 }
2835 }
2836 }
2837}
2838
2839impl GrpcMetaClient {
2840 for_all_meta_rpc! { meta_rpc_client_method_impl }
2841}