1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{
33 AlterDatabaseParam, FunctionId, IndexId, ObjectId, SecretId, TableId,
34};
35use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
36use risingwave_common::hash::WorkerSlotMapping;
37use risingwave_common::id::{
38 ConnectionId, DatabaseId, JobId, SchemaId, SinkId, SubscriptionId, UserId, ViewId, WorkerId,
39};
40use risingwave_common::monitor::EndpointExt;
41use risingwave_common::system_param::reader::SystemParamsReader;
42use risingwave_common::telemetry::report::TelemetryInfoFetcher;
43use risingwave_common::util::addr::HostAddr;
44use risingwave_common::util::meta_addr::MetaAddressStrategy;
45use risingwave_common::util::resource_util::cpu::total_cpu_available;
46use risingwave_common::util::resource_util::hostname;
47use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
48use risingwave_error::bail;
49use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
50use risingwave_hummock_sdk::change_log::{TableChangeLog, TableChangeLogs};
51use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
52use risingwave_hummock_sdk::{
53 CompactionGroupId, HummockEpoch, HummockVersionId, ObjectIdRange, SyncResult,
54};
55use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
56use risingwave_pb::backup_service::*;
57use risingwave_pb::catalog::{
58 Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
59 PbSubscription, PbTable, PbView, Table,
60};
61use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
62use risingwave_pb::cloud_service::*;
63use risingwave_pb::common::worker_node::Property;
64use risingwave_pb::common::{HostAddress, OptionalUint32, OptionalUint64, WorkerNode, WorkerType};
65use risingwave_pb::configured_monitor_service_client;
66use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
67use risingwave_pb::ddl_service::alter_owner_request::Object;
68use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
69use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
70use risingwave_pb::ddl_service::*;
71use risingwave_pb::hummock::compact_task::TaskStatus;
72use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
73use risingwave_pb::hummock::get_table_change_logs_request::PbTableFilter;
74use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
75use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
76use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
77use risingwave_pb::hummock::write_limits::WriteLimit;
78use risingwave_pb::hummock::*;
79use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Register as IcebergRegister;
80use risingwave_pb::iceberg_compaction::{
81 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
82 subscribe_iceberg_compaction_event_request,
83};
84use risingwave_pb::id::{ActorId, FragmentId, HummockSstableId, SourceId};
85use risingwave_pb::meta::alter_connector_props_request::{
86 AlterConnectorPropsObject, AlterIcebergTableIds, ExtraOptions,
87};
88use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
89use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
90use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
91use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
92use risingwave_pb::meta::hosted_iceberg_catalog_service_client::HostedIcebergCatalogServiceClient;
93use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
94use risingwave_pb::meta::list_actor_states_response::ActorState;
95use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
96use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
97use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
98use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
99use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
100use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
101use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
102use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
103use risingwave_pb::meta::serving_service_client::ServingServiceClient;
104use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
105use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
106use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
107use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
108use risingwave_pb::meta::{FragmentDistribution, *};
109use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
110use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
111use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
112use risingwave_pb::secret::PbSecretRef;
113use risingwave_pb::stream_plan::StreamFragmentGraph;
114use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
115use risingwave_pb::user::update_user_request::UpdateField;
116use risingwave_pb::user::user_service_client::UserServiceClient;
117use risingwave_pb::user::*;
118use thiserror_ext::AsReport;
119use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
120use tokio::sync::oneshot::Sender;
121use tokio::sync::{RwLock, mpsc, oneshot};
122use tokio::task::JoinHandle;
123use tokio::time::{self};
124use tokio_retry::strategy::{ExponentialBackoff, jitter};
125use tokio_stream::wrappers::UnboundedReceiverStream;
126use tonic::transport::Endpoint;
127use tonic::{Code, Request, Streaming};
128
129use crate::channel::{Channel, WrappedChannelExt};
130use crate::error::{Result, RpcError};
131use crate::hummock_meta_client::{
132 CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
133 IcebergCompactionEventItem,
134};
135use crate::meta_rpc_client_method_impl;
136
137#[derive(Clone, Debug)]
139pub struct MetaClient {
140 worker_id: WorkerId,
141 worker_type: WorkerType,
142 host_addr: HostAddr,
143 inner: GrpcMetaClient,
144 meta_config: Arc<MetaConfig>,
145 cluster_id: String,
146 shutting_down: Arc<AtomicBool>,
147}
148
149impl MetaClient {
150 pub fn worker_id(&self) -> WorkerId {
151 self.worker_id
152 }
153
154 pub fn host_addr(&self) -> &HostAddr {
155 &self.host_addr
156 }
157
158 pub fn worker_type(&self) -> WorkerType {
159 self.worker_type
160 }
161
162 pub fn cluster_id(&self) -> &str {
163 &self.cluster_id
164 }
165
166 pub async fn subscribe(
168 &self,
169 subscribe_type: SubscribeType,
170 ) -> Result<Streaming<SubscribeResponse>> {
171 let request = SubscribeRequest {
172 subscribe_type: subscribe_type as i32,
173 host: Some(self.host_addr.to_protobuf()),
174 worker_id: self.worker_id(),
175 };
176
177 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
178 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
179 true,
180 );
181
182 tokio_retry::Retry::spawn(retry_strategy, || async {
183 let request = request.clone();
184 self.inner.subscribe(request).await
185 })
186 .await
187 }
188
189 pub async fn create_connection(
190 &self,
191 connection_name: String,
192 database_id: DatabaseId,
193 schema_id: SchemaId,
194 owner_id: UserId,
195 req: create_connection_request::Payload,
196 ) -> Result<WaitVersion> {
197 let request = CreateConnectionRequest {
198 name: connection_name,
199 database_id,
200 schema_id,
201 owner_id,
202 payload: Some(req),
203 };
204 let resp = self.inner.create_connection(request).await?;
205 Ok(resp
206 .version
207 .ok_or_else(|| anyhow!("wait version not set"))?)
208 }
209
210 pub async fn create_secret(
211 &self,
212 secret_name: String,
213 database_id: DatabaseId,
214 schema_id: SchemaId,
215 owner_id: UserId,
216 value: Vec<u8>,
217 ) -> Result<WaitVersion> {
218 let request = CreateSecretRequest {
219 name: secret_name,
220 database_id,
221 schema_id,
222 owner_id,
223 value,
224 };
225 let resp = self.inner.create_secret(request).await?;
226 Ok(resp
227 .version
228 .ok_or_else(|| anyhow!("wait version not set"))?)
229 }
230
231 pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
232 let request = ListConnectionsRequest {};
233 let resp = self.inner.list_connections(request).await?;
234 Ok(resp.connections)
235 }
236
237 pub async fn drop_connection(
238 &self,
239 connection_id: ConnectionId,
240 cascade: bool,
241 ) -> Result<WaitVersion> {
242 let request = DropConnectionRequest {
243 connection_id,
244 cascade,
245 };
246 let resp = self.inner.drop_connection(request).await?;
247 Ok(resp
248 .version
249 .ok_or_else(|| anyhow!("wait version not set"))?)
250 }
251
252 pub async fn drop_secret(&self, secret_id: SecretId, cascade: bool) -> Result<WaitVersion> {
253 let request = DropSecretRequest { secret_id, cascade };
254 let resp = self.inner.drop_secret(request).await?;
255 Ok(resp
256 .version
257 .ok_or_else(|| anyhow!("wait version not set"))?)
258 }
259
260 pub async fn register_new(
264 addr_strategy: MetaAddressStrategy,
265 worker_type: WorkerType,
266 addr: &HostAddr,
267 property: Property,
268 meta_config: Arc<MetaConfig>,
269 ) -> (Self, SystemParamsReader) {
270 let ret =
271 Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
272
273 match ret {
274 Ok(ret) => ret,
275 Err(err) => {
276 tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
277 std::process::exit(1);
278 }
279 }
280 }
281
282 async fn register_new_inner(
283 addr_strategy: MetaAddressStrategy,
284 worker_type: WorkerType,
285 addr: &HostAddr,
286 property: Property,
287 meta_config: Arc<MetaConfig>,
288 ) -> Result<(Self, SystemParamsReader)> {
289 tracing::info!("register meta client using strategy: {}", addr_strategy);
290
291 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
293 Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
294 true,
295 );
296
297 let init_result: Result<_> = tokio_retry::RetryIf::spawn(
298 retry_strategy,
299 || async {
300 let grpc_meta_client =
301 GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
302
303 let add_worker_resp = grpc_meta_client
304 .add_worker_node(AddWorkerNodeRequest {
305 worker_type: worker_type as i32,
306 host: Some(addr.to_protobuf()),
307 property: Some(property.clone()),
308 resource: Some(risingwave_pb::common::worker_node::Resource {
309 rw_version: RW_VERSION.to_owned(),
310 total_memory_bytes: system_memory_available_bytes() as _,
311 total_cpu_cores: total_cpu_available() as _,
312 hostname: hostname(),
313 }),
314 })
315 .await
316 .context("failed to add worker node")?;
317
318 let system_params_resp = grpc_meta_client
319 .get_system_params(GetSystemParamsRequest {})
320 .await
321 .context("failed to get initial system params")?;
322
323 Ok((add_worker_resp, system_params_resp, grpc_meta_client))
324 },
325 |e: &RpcError| !e.is_from_tonic_server_impl(),
328 )
329 .await;
330
331 let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
332 let worker_id = add_worker_resp
333 .node_id
334 .expect("AddWorkerNodeResponse::node_id is empty");
335
336 let meta_client = Self {
337 worker_id,
338 worker_type,
339 host_addr: addr.clone(),
340 inner: grpc_meta_client,
341 meta_config: meta_config.clone(),
342 cluster_id: add_worker_resp.cluster_id,
343 shutting_down: Arc::new(false.into()),
344 };
345
346 static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
347 REPORT_PANIC.call_once(|| {
348 let meta_client_clone = meta_client.clone();
349 std::panic::update_hook(move |default_hook, info| {
350 meta_client_clone.try_add_panic_event_blocking(info, None);
352 default_hook(info);
353 });
354 });
355
356 Ok((meta_client, system_params_resp.params.unwrap().into()))
357 }
358
359 pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
361 let request = ActivateWorkerNodeRequest {
362 host: Some(addr.to_protobuf()),
363 node_id: self.worker_id,
364 };
365 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
366 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
367 true,
368 );
369 tokio_retry::Retry::spawn(retry_strategy, || async {
370 let request = request.clone();
371 self.inner.activate_worker_node(request).await
372 })
373 .await?;
374
375 Ok(())
376 }
377
378 pub async fn send_heartbeat(&self) -> Result<()> {
380 let request = HeartbeatRequest {
381 node_id: self.worker_id,
382 resource: Some(risingwave_pb::common::worker_node::Resource {
383 rw_version: RW_VERSION.to_owned(),
384 total_memory_bytes: system_memory_available_bytes() as _,
385 total_cpu_cores: total_cpu_available() as _,
386 hostname: hostname(),
387 }),
388 };
389 let resp = self.inner.heartbeat(request).await?;
390 if let Some(status) = resp.status
391 && status.code() == risingwave_pb::common::status::Code::UnknownWorker
392 {
393 if !self.shutting_down.load(Relaxed) {
396 tracing::error!(message = status.message, "worker expired");
397 std::process::exit(1);
398 }
399 }
400 Ok(())
401 }
402
403 pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
404 let request = CreateDatabaseRequest { db: Some(db) };
405 let resp = self.inner.create_database(request).await?;
406 Ok(resp
408 .version
409 .ok_or_else(|| anyhow!("wait version not set"))?)
410 }
411
412 pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
413 let request = CreateSchemaRequest {
414 schema: Some(schema),
415 };
416 let resp = self.inner.create_schema(request).await?;
417 Ok(resp
419 .version
420 .ok_or_else(|| anyhow!("wait version not set"))?)
421 }
422
423 pub async fn create_materialized_view(
424 &self,
425 table: PbTable,
426 graph: StreamFragmentGraph,
427 dependencies: HashSet<ObjectId>,
428 resource_type: streaming_job_resource_type::ResourceType,
429 if_not_exists: bool,
430 ) -> Result<WaitVersion> {
431 let request = CreateMaterializedViewRequest {
432 materialized_view: Some(table),
433 fragment_graph: Some(graph),
434 resource_type: Some(PbStreamingJobResourceType {
435 resource_type: Some(resource_type),
436 }),
437 dependencies: dependencies.into_iter().collect(),
438 if_not_exists,
439 };
440 let resp = self.inner.create_materialized_view(request).await?;
441 Ok(resp
443 .version
444 .ok_or_else(|| anyhow!("wait version not set"))?)
445 }
446
447 pub async fn drop_materialized_view(
448 &self,
449 table_id: TableId,
450 cascade: bool,
451 ) -> Result<WaitVersion> {
452 let request = DropMaterializedViewRequest { table_id, cascade };
453
454 let resp = self.inner.drop_materialized_view(request).await?;
455 Ok(resp
456 .version
457 .ok_or_else(|| anyhow!("wait version not set"))?)
458 }
459
460 pub async fn create_source(
461 &self,
462 source: PbSource,
463 graph: Option<StreamFragmentGraph>,
464 if_not_exists: bool,
465 ) -> Result<WaitVersion> {
466 let request = CreateSourceRequest {
467 source: Some(source),
468 fragment_graph: graph,
469 if_not_exists,
470 };
471
472 let resp = self.inner.create_source(request).await?;
473 Ok(resp
474 .version
475 .ok_or_else(|| anyhow!("wait version not set"))?)
476 }
477
478 pub async fn create_sink(
479 &self,
480 sink: PbSink,
481 graph: StreamFragmentGraph,
482 dependencies: HashSet<ObjectId>,
483 if_not_exists: bool,
484 ) -> Result<WaitVersion> {
485 let request = CreateSinkRequest {
486 sink: Some(sink),
487 fragment_graph: Some(graph),
488 dependencies: dependencies.into_iter().collect(),
489 if_not_exists,
490 };
491
492 let resp = self.inner.create_sink(request).await?;
493 Ok(resp
494 .version
495 .ok_or_else(|| anyhow!("wait version not set"))?)
496 }
497
498 pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
499 let request = CreateSubscriptionRequest {
500 subscription: Some(subscription),
501 };
502
503 let resp = self.inner.create_subscription(request).await?;
504 Ok(resp
505 .version
506 .ok_or_else(|| anyhow!("wait version not set"))?)
507 }
508
509 pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
510 let request = CreateFunctionRequest {
511 function: Some(function),
512 };
513 let resp = self.inner.create_function(request).await?;
514 Ok(resp
515 .version
516 .ok_or_else(|| anyhow!("wait version not set"))?)
517 }
518
519 pub async fn create_table(
520 &self,
521 source: Option<PbSource>,
522 table: PbTable,
523 graph: StreamFragmentGraph,
524 job_type: PbTableJobType,
525 if_not_exists: bool,
526 dependencies: HashSet<ObjectId>,
527 ) -> Result<WaitVersion> {
528 let request = CreateTableRequest {
529 materialized_view: Some(table),
530 fragment_graph: Some(graph),
531 source,
532 job_type: job_type as _,
533 if_not_exists,
534 dependencies: dependencies.into_iter().collect(),
535 };
536 let resp = self.inner.create_table(request).await?;
537 Ok(resp
539 .version
540 .ok_or_else(|| anyhow!("wait version not set"))?)
541 }
542
543 pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
544 let request = CommentOnRequest {
545 comment: Some(comment),
546 };
547 let resp = self.inner.comment_on(request).await?;
548 Ok(resp
549 .version
550 .ok_or_else(|| anyhow!("wait version not set"))?)
551 }
552
553 pub async fn alter_name(
554 &self,
555 object: alter_name_request::Object,
556 name: &str,
557 ) -> Result<WaitVersion> {
558 let request = AlterNameRequest {
559 object: Some(object),
560 new_name: name.to_owned(),
561 };
562 let resp = self.inner.alter_name(request).await?;
563 Ok(resp
564 .version
565 .ok_or_else(|| anyhow!("wait version not set"))?)
566 }
567
568 pub async fn alter_owner(&self, object: Object, owner_id: UserId) -> Result<WaitVersion> {
569 let request = AlterOwnerRequest {
570 object: Some(object),
571 owner_id,
572 };
573 let resp = self.inner.alter_owner(request).await?;
574 Ok(resp
575 .version
576 .ok_or_else(|| anyhow!("wait version not set"))?)
577 }
578
579 pub async fn alter_subscription_retention(
580 &self,
581 subscription_id: SubscriptionId,
582 retention_seconds: u64,
583 definition: String,
584 ) -> Result<WaitVersion> {
585 let request = AlterSubscriptionRetentionRequest {
586 subscription_id,
587 retention_seconds,
588 definition,
589 };
590 let resp = self.inner.alter_subscription_retention(request).await?;
591 Ok(resp
592 .version
593 .ok_or_else(|| anyhow!("wait version not set"))?)
594 }
595
596 pub async fn alter_database_param(
597 &self,
598 database_id: DatabaseId,
599 param: AlterDatabaseParam,
600 ) -> Result<WaitVersion> {
601 let request = match param {
602 AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms) => {
603 let barrier_interval_ms = OptionalUint32 {
604 value: barrier_interval_ms,
605 };
606 AlterDatabaseParamRequest {
607 database_id,
608 param: Some(alter_database_param_request::Param::BarrierIntervalMs(
609 barrier_interval_ms,
610 )),
611 }
612 }
613 AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency) => {
614 let checkpoint_frequency = OptionalUint64 {
615 value: checkpoint_frequency,
616 };
617 AlterDatabaseParamRequest {
618 database_id,
619 param: Some(alter_database_param_request::Param::CheckpointFrequency(
620 checkpoint_frequency,
621 )),
622 }
623 }
624 };
625 let resp = self.inner.alter_database_param(request).await?;
626 Ok(resp
627 .version
628 .ok_or_else(|| anyhow!("wait version not set"))?)
629 }
630
631 pub async fn alter_set_schema(
632 &self,
633 object: alter_set_schema_request::Object,
634 new_schema_id: SchemaId,
635 ) -> Result<WaitVersion> {
636 let request = AlterSetSchemaRequest {
637 new_schema_id,
638 object: Some(object),
639 };
640 let resp = self.inner.alter_set_schema(request).await?;
641 Ok(resp
642 .version
643 .ok_or_else(|| anyhow!("wait version not set"))?)
644 }
645
646 pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
647 let request = AlterSourceRequest {
648 source: Some(source),
649 };
650 let resp = self.inner.alter_source(request).await?;
651 Ok(resp
652 .version
653 .ok_or_else(|| anyhow!("wait version not set"))?)
654 }
655
656 pub async fn alter_parallelism(
657 &self,
658 job_id: JobId,
659 parallelism: PbTableParallelism,
660 deferred: bool,
661 ) -> Result<()> {
662 let request = AlterParallelismRequest {
663 table_id: job_id,
664 parallelism: Some(parallelism),
665 deferred,
666 adaptive_parallelism_strategy: None,
667 };
668
669 self.inner.alter_parallelism(request).await?;
670 Ok(())
671 }
672
673 pub async fn alter_backfill_parallelism(
674 &self,
675 job_id: JobId,
676 parallelism: Option<PbTableParallelism>,
677 deferred: bool,
678 ) -> Result<()> {
679 let request = AlterBackfillParallelismRequest {
680 table_id: job_id,
681 parallelism,
682 deferred,
683 adaptive_parallelism_strategy: None,
684 };
685
686 self.inner.alter_backfill_parallelism(request).await?;
687 Ok(())
688 }
689
690 pub async fn alter_streaming_job_config(
691 &self,
692 job_id: JobId,
693 entries_to_add: HashMap<String, String>,
694 keys_to_remove: Vec<String>,
695 ) -> Result<()> {
696 let request = AlterStreamingJobConfigRequest {
697 job_id,
698 entries_to_add,
699 keys_to_remove,
700 };
701
702 self.inner.alter_streaming_job_config(request).await?;
703 Ok(())
704 }
705
706 pub async fn alter_fragment_parallelism(
707 &self,
708 fragment_ids: Vec<FragmentId>,
709 parallelism: Option<PbTableParallelism>,
710 ) -> Result<()> {
711 let request = AlterFragmentParallelismRequest {
712 fragment_ids,
713 parallelism,
714 };
715
716 self.inner.alter_fragment_parallelism(request).await?;
717 Ok(())
718 }
719
720 pub async fn alter_cdc_table_backfill_parallelism(
721 &self,
722 table_id: JobId,
723 parallelism: PbTableParallelism,
724 ) -> Result<()> {
725 let request = AlterCdcTableBackfillParallelismRequest {
726 table_id,
727 parallelism: Some(parallelism),
728 };
729 self.inner
730 .alter_cdc_table_backfill_parallelism(request)
731 .await?;
732 Ok(())
733 }
734
735 pub async fn alter_resource_group(
736 &self,
737 table_id: TableId,
738 resource_group: Option<String>,
739 deferred: bool,
740 ) -> Result<()> {
741 let request = AlterResourceGroupRequest {
742 table_id,
743 resource_group,
744 deferred,
745 };
746
747 self.inner.alter_resource_group(request).await?;
748 Ok(())
749 }
750
751 pub async fn alter_swap_rename(
752 &self,
753 object: alter_swap_rename_request::Object,
754 ) -> Result<WaitVersion> {
755 let request = AlterSwapRenameRequest {
756 object: Some(object),
757 };
758 let resp = self.inner.alter_swap_rename(request).await?;
759 Ok(resp
760 .version
761 .ok_or_else(|| anyhow!("wait version not set"))?)
762 }
763
764 pub async fn alter_secret(
765 &self,
766 secret_id: SecretId,
767 secret_name: String,
768 database_id: DatabaseId,
769 schema_id: SchemaId,
770 owner_id: UserId,
771 value: Vec<u8>,
772 ) -> Result<WaitVersion> {
773 let request = AlterSecretRequest {
774 secret_id,
775 name: secret_name,
776 database_id,
777 schema_id,
778 owner_id,
779 value,
780 };
781 let resp = self.inner.alter_secret(request).await?;
782 Ok(resp
783 .version
784 .ok_or_else(|| anyhow!("wait version not set"))?)
785 }
786
787 pub async fn replace_job(
788 &self,
789 graph: StreamFragmentGraph,
790 replace_job: ReplaceJob,
791 ) -> Result<WaitVersion> {
792 let request = ReplaceJobPlanRequest {
793 plan: Some(ReplaceJobPlan {
794 fragment_graph: Some(graph),
795 replace_job: Some(replace_job),
796 }),
797 };
798 let resp = self.inner.replace_job_plan(request).await?;
799 Ok(resp
801 .version
802 .ok_or_else(|| anyhow!("wait version not set"))?)
803 }
804
805 pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
806 let request = AutoSchemaChangeRequest {
807 schema_change: Some(schema_change),
808 };
809 let _ = self.inner.auto_schema_change(request).await?;
810 Ok(())
811 }
812
813 pub async fn create_view(
814 &self,
815 view: PbView,
816 dependencies: HashSet<ObjectId>,
817 ) -> Result<WaitVersion> {
818 let request = CreateViewRequest {
819 view: Some(view),
820 dependencies: dependencies.into_iter().collect(),
821 };
822 let resp = self.inner.create_view(request).await?;
823 Ok(resp
825 .version
826 .ok_or_else(|| anyhow!("wait version not set"))?)
827 }
828
829 pub async fn create_index(
830 &self,
831 index: PbIndex,
832 table: PbTable,
833 graph: StreamFragmentGraph,
834 if_not_exists: bool,
835 ) -> Result<WaitVersion> {
836 let request = CreateIndexRequest {
837 index: Some(index),
838 index_table: Some(table),
839 fragment_graph: Some(graph),
840 if_not_exists,
841 };
842 let resp = self.inner.create_index(request).await?;
843 Ok(resp
845 .version
846 .ok_or_else(|| anyhow!("wait version not set"))?)
847 }
848
849 pub async fn drop_table(
850 &self,
851 source_id: Option<SourceId>,
852 table_id: TableId,
853 cascade: bool,
854 ) -> Result<WaitVersion> {
855 let request = DropTableRequest {
856 source_id: source_id.map(risingwave_pb::ddl_service::drop_table_request::SourceId::Id),
857 table_id,
858 cascade,
859 };
860
861 let resp = self.inner.drop_table(request).await?;
862 Ok(resp
863 .version
864 .ok_or_else(|| anyhow!("wait version not set"))?)
865 }
866
867 pub async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
868 let request = CompactIcebergTableRequest { sink_id };
869 let resp = self.inner.compact_iceberg_table(request).await?;
870 Ok(resp.task_id)
871 }
872
873 pub async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
874 let request = ExpireIcebergTableSnapshotsRequest { sink_id };
875 let _resp = self.inner.expire_iceberg_table_snapshots(request).await?;
876 Ok(())
877 }
878
879 pub async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<WaitVersion> {
880 let request = DropViewRequest { view_id, cascade };
881 let resp = self.inner.drop_view(request).await?;
882 Ok(resp
883 .version
884 .ok_or_else(|| anyhow!("wait version not set"))?)
885 }
886
887 pub async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<WaitVersion> {
888 let request = DropSourceRequest { source_id, cascade };
889 let resp = self.inner.drop_source(request).await?;
890 Ok(resp
891 .version
892 .ok_or_else(|| anyhow!("wait version not set"))?)
893 }
894
895 pub async fn reset_source(&self, source_id: SourceId) -> Result<WaitVersion> {
896 let request = ResetSourceRequest { source_id };
897 let resp = self.inner.reset_source(request).await?;
898 Ok(resp
899 .version
900 .ok_or_else(|| anyhow!("wait version not set"))?)
901 }
902
903 pub async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<WaitVersion> {
904 let request = DropSinkRequest { sink_id, cascade };
905 let resp = self.inner.drop_sink(request).await?;
906 Ok(resp
907 .version
908 .ok_or_else(|| anyhow!("wait version not set"))?)
909 }
910
911 pub async fn drop_subscription(
912 &self,
913 subscription_id: SubscriptionId,
914 cascade: bool,
915 ) -> Result<WaitVersion> {
916 let request = DropSubscriptionRequest {
917 subscription_id,
918 cascade,
919 };
920 let resp = self.inner.drop_subscription(request).await?;
921 Ok(resp
922 .version
923 .ok_or_else(|| anyhow!("wait version not set"))?)
924 }
925
926 pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
927 let request = DropIndexRequest { index_id, cascade };
928 let resp = self.inner.drop_index(request).await?;
929 Ok(resp
930 .version
931 .ok_or_else(|| anyhow!("wait version not set"))?)
932 }
933
934 pub async fn drop_function(
935 &self,
936 function_id: FunctionId,
937 cascade: bool,
938 ) -> Result<WaitVersion> {
939 let request = DropFunctionRequest {
940 function_id,
941 cascade,
942 };
943 let resp = self.inner.drop_function(request).await?;
944 Ok(resp
945 .version
946 .ok_or_else(|| anyhow!("wait version not set"))?)
947 }
948
949 pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
950 let request = DropDatabaseRequest { database_id };
951 let resp = self.inner.drop_database(request).await?;
952 Ok(resp
953 .version
954 .ok_or_else(|| anyhow!("wait version not set"))?)
955 }
956
957 pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
958 let request = DropSchemaRequest { schema_id, cascade };
959 let resp = self.inner.drop_schema(request).await?;
960 Ok(resp
961 .version
962 .ok_or_else(|| anyhow!("wait version not set"))?)
963 }
964
965 pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
967 let request = CreateUserRequest { user: Some(user) };
968 let resp = self.inner.create_user(request).await?;
969 Ok(resp.version)
970 }
971
972 pub async fn drop_user(&self, user_id: UserId) -> Result<u64> {
973 let request = DropUserRequest { user_id };
974 let resp = self.inner.drop_user(request).await?;
975 Ok(resp.version)
976 }
977
978 pub async fn update_user(
979 &self,
980 user: UserInfo,
981 update_fields: Vec<UpdateField>,
982 ) -> Result<u64> {
983 let request = UpdateUserRequest {
984 user: Some(user),
985 update_fields: update_fields
986 .into_iter()
987 .map(|field| field as i32)
988 .collect::<Vec<_>>(),
989 };
990 let resp = self.inner.update_user(request).await?;
991 Ok(resp.version)
992 }
993
994 pub async fn grant_privilege(
995 &self,
996 user_ids: Vec<UserId>,
997 privileges: Vec<GrantPrivilege>,
998 with_grant_option: bool,
999 granted_by: UserId,
1000 ) -> Result<u64> {
1001 let request = GrantPrivilegeRequest {
1002 user_ids,
1003 privileges,
1004 with_grant_option,
1005 granted_by,
1006 };
1007 let resp = self.inner.grant_privilege(request).await?;
1008 Ok(resp.version)
1009 }
1010
1011 pub async fn revoke_privilege(
1012 &self,
1013 user_ids: Vec<UserId>,
1014 privileges: Vec<GrantPrivilege>,
1015 granted_by: UserId,
1016 revoke_by: UserId,
1017 revoke_grant_option: bool,
1018 cascade: bool,
1019 ) -> Result<u64> {
1020 let request = RevokePrivilegeRequest {
1021 user_ids,
1022 privileges,
1023 granted_by,
1024 revoke_by,
1025 revoke_grant_option,
1026 cascade,
1027 };
1028 let resp = self.inner.revoke_privilege(request).await?;
1029 Ok(resp.version)
1030 }
1031
1032 pub async fn alter_default_privilege(
1033 &self,
1034 user_ids: Vec<UserId>,
1035 database_id: DatabaseId,
1036 schema_ids: Vec<SchemaId>,
1037 operation: AlterDefaultPrivilegeOperation,
1038 granted_by: UserId,
1039 ) -> Result<()> {
1040 let request = AlterDefaultPrivilegeRequest {
1041 user_ids,
1042 database_id,
1043 schema_ids,
1044 operation: Some(operation),
1045 granted_by,
1046 };
1047 self.inner.alter_default_privilege(request).await?;
1048 Ok(())
1049 }
1050
1051 pub async fn unregister(&self) -> Result<()> {
1053 let request = DeleteWorkerNodeRequest {
1054 host: Some(self.host_addr.to_protobuf()),
1055 };
1056 self.inner.delete_worker_node(request).await?;
1057 self.shutting_down.store(true, Relaxed);
1058 Ok(())
1059 }
1060
1061 pub async fn try_unregister(&self) {
1063 match self.unregister().await {
1064 Ok(_) => {
1065 tracing::info!(
1066 worker_id = %self.worker_id(),
1067 "successfully unregistered from meta service",
1068 )
1069 }
1070 Err(e) => {
1071 tracing::warn!(
1072 error = %e.as_report(),
1073 worker_id = %self.worker_id(),
1074 "failed to unregister from meta service",
1075 );
1076 }
1077 }
1078 }
1079
1080 pub async fn list_worker_nodes(
1081 &self,
1082 worker_type: Option<WorkerType>,
1083 ) -> Result<Vec<WorkerNode>> {
1084 let request = ListAllNodesRequest {
1085 worker_type: worker_type.map(Into::into),
1086 include_starting_nodes: true,
1087 };
1088 let resp = self.inner.list_all_nodes(request).await?;
1089 Ok(resp.nodes)
1090 }
1091
1092 pub fn start_heartbeat_loop(
1094 meta_client: MetaClient,
1095 min_interval: Duration,
1096 ) -> (JoinHandle<()>, Sender<()>) {
1097 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1098 let join_handle = tokio::spawn(async move {
1099 let mut min_interval_ticker = tokio::time::interval(min_interval);
1100 loop {
1101 tokio::select! {
1102 biased;
1103 _ = &mut shutdown_rx => {
1105 tracing::info!("Heartbeat loop is stopped");
1106 return;
1107 }
1108 _ = min_interval_ticker.tick() => {},
1110 }
1111 tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
1112 match tokio::time::timeout(
1113 min_interval * 3,
1115 meta_client.send_heartbeat(),
1116 )
1117 .await
1118 {
1119 Ok(Ok(_)) => {}
1120 Ok(Err(err)) => {
1121 tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
1122 }
1123 Err(_) => {
1124 tracing::warn!("Failed to send_heartbeat: timeout");
1125 }
1126 }
1127 }
1128 });
1129 (join_handle, shutdown_tx)
1130 }
1131
1132 pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
1133 let request = RisectlListStateTablesRequest {};
1134 let resp = self.inner.risectl_list_state_tables(request).await?;
1135 Ok(resp.tables)
1136 }
1137
1138 pub async fn risectl_resume_backfill(
1139 &self,
1140 request: RisectlResumeBackfillRequest,
1141 ) -> Result<()> {
1142 self.inner.risectl_resume_backfill(request).await?;
1143 Ok(())
1144 }
1145
1146 pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
1147 let request = FlushRequest { database_id };
1148 let resp = self.inner.flush(request).await?;
1149 Ok(resp.hummock_version_id)
1150 }
1151
1152 pub async fn wait(&self, job_id: Option<JobId>) -> Result<WaitVersion> {
1153 let request = WaitRequest { job_id };
1154 let resp = self.inner.wait(request).await?;
1155 Ok(resp
1156 .version
1157 .ok_or_else(|| anyhow!("wait version not set"))?)
1158 }
1159
1160 pub async fn recover(&self) -> Result<()> {
1161 let request = RecoverRequest {};
1162 self.inner.recover(request).await?;
1163 Ok(())
1164 }
1165
1166 pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
1167 let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
1168 let resp = self.inner.cancel_creating_jobs(request).await?;
1169 Ok(resp.canceled_jobs)
1170 }
1171
1172 pub async fn list_table_fragments(
1173 &self,
1174 job_ids: &[JobId],
1175 ) -> Result<HashMap<JobId, TableFragmentInfo>> {
1176 let request = ListTableFragmentsRequest {
1177 table_ids: job_ids.to_vec(),
1178 };
1179 let resp = self.inner.list_table_fragments(request).await?;
1180 Ok(resp.table_fragments)
1181 }
1182
1183 pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1184 let resp = self
1185 .inner
1186 .list_streaming_job_states(ListStreamingJobStatesRequest {})
1187 .await?;
1188 Ok(resp.states)
1189 }
1190
1191 pub async fn list_fragment_distributions(
1192 &self,
1193 include_node: bool,
1194 ) -> Result<Vec<FragmentDistribution>> {
1195 let resp = self
1196 .inner
1197 .list_fragment_distribution(ListFragmentDistributionRequest {
1198 include_node: Some(include_node),
1199 })
1200 .await?;
1201 Ok(resp.distributions)
1202 }
1203
1204 pub async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
1205 let resp = self
1206 .inner
1207 .list_creating_fragment_distribution(ListCreatingFragmentDistributionRequest {
1208 include_node: Some(true),
1209 })
1210 .await?;
1211 Ok(resp.distributions)
1212 }
1213
1214 pub async fn get_fragment_by_id(
1215 &self,
1216 fragment_id: FragmentId,
1217 ) -> Result<Option<FragmentDistribution>> {
1218 let resp = self
1219 .inner
1220 .get_fragment_by_id(GetFragmentByIdRequest { fragment_id })
1221 .await?;
1222 Ok(resp.distribution)
1223 }
1224
1225 pub async fn get_fragment_vnodes(
1226 &self,
1227 fragment_id: FragmentId,
1228 ) -> Result<Vec<(ActorId, Vec<u32>)>> {
1229 let resp = self
1230 .inner
1231 .get_fragment_vnodes(GetFragmentVnodesRequest { fragment_id })
1232 .await?;
1233 Ok(resp
1234 .actor_vnodes
1235 .into_iter()
1236 .map(|actor| (actor.actor_id, actor.vnode_indices))
1237 .collect())
1238 }
1239
1240 pub async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>> {
1241 let resp = self
1242 .inner
1243 .get_actor_vnodes(GetActorVnodesRequest { actor_id })
1244 .await?;
1245 Ok(resp.vnode_indices)
1246 }
1247
1248 pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1249 let resp = self
1250 .inner
1251 .list_actor_states(ListActorStatesRequest {})
1252 .await?;
1253 Ok(resp.states)
1254 }
1255
1256 pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1257 let resp = self
1258 .inner
1259 .list_actor_splits(ListActorSplitsRequest {})
1260 .await?;
1261
1262 Ok(resp.actor_splits)
1263 }
1264
1265 pub async fn pause(&self) -> Result<PauseResponse> {
1266 let request = PauseRequest {};
1267 let resp = self.inner.pause(request).await?;
1268 Ok(resp)
1269 }
1270
1271 pub async fn resume(&self) -> Result<ResumeResponse> {
1272 let request = ResumeRequest {};
1273 let resp = self.inner.resume(request).await?;
1274 Ok(resp)
1275 }
1276
1277 pub async fn apply_throttle(
1278 &self,
1279 throttle_target: PbThrottleTarget,
1280 throttle_type: risingwave_pb::common::PbThrottleType,
1281 id: u32,
1282 rate: Option<u32>,
1283 ) -> Result<ApplyThrottleResponse> {
1284 let request = ApplyThrottleRequest {
1285 throttle_target: throttle_target as i32,
1286 throttle_type: throttle_type as i32,
1287 id,
1288 rate,
1289 };
1290 let resp = self.inner.apply_throttle(request).await?;
1291 Ok(resp)
1292 }
1293
1294 pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1295 let resp = self
1296 .inner
1297 .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1298 .await?;
1299 Ok(resp.get_status().unwrap())
1300 }
1301
1302 pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1303 let request = GetClusterInfoRequest {};
1304 let resp = self.inner.get_cluster_info(request).await?;
1305 Ok(resp)
1306 }
1307
1308 pub async fn reschedule(
1309 &self,
1310 worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1311 revision: u64,
1312 resolve_no_shuffle_upstream: bool,
1313 ) -> Result<(bool, u64)> {
1314 let request = RescheduleRequest {
1315 revision,
1316 resolve_no_shuffle_upstream,
1317 worker_reschedules,
1318 };
1319 let resp = self.inner.reschedule(request).await?;
1320 Ok((resp.success, resp.revision))
1321 }
1322
1323 pub async fn risectl_get_pinned_versions_summary(
1324 &self,
1325 ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1326 let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1327 self.inner
1328 .rise_ctl_get_pinned_versions_summary(request)
1329 .await
1330 }
1331
1332 pub async fn risectl_get_checkpoint_hummock_version(
1333 &self,
1334 ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1335 let request = RiseCtlGetCheckpointVersionRequest {};
1336 self.inner.rise_ctl_get_checkpoint_version(request).await
1337 }
1338
1339 pub async fn risectl_pause_hummock_version_checkpoint(
1340 &self,
1341 ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1342 let request = RiseCtlPauseVersionCheckpointRequest {};
1343 self.inner.rise_ctl_pause_version_checkpoint(request).await
1344 }
1345
1346 pub async fn risectl_resume_hummock_version_checkpoint(
1347 &self,
1348 ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1349 let request = RiseCtlResumeVersionCheckpointRequest {};
1350 self.inner.rise_ctl_resume_version_checkpoint(request).await
1351 }
1352
1353 pub async fn init_metadata_for_replay(
1354 &self,
1355 tables: Vec<PbTable>,
1356 compaction_groups: Vec<CompactionGroupInfo>,
1357 ) -> Result<()> {
1358 let req = InitMetadataForReplayRequest {
1359 tables,
1360 compaction_groups,
1361 };
1362 let _resp = self.inner.init_metadata_for_replay(req).await?;
1363 Ok(())
1364 }
1365
1366 pub async fn replay_version_delta(
1367 &self,
1368 version_delta: HummockVersionDelta,
1369 ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1370 let req = ReplayVersionDeltaRequest {
1371 version_delta: Some(version_delta.into()),
1372 };
1373 let resp = self.inner.replay_version_delta(req).await?;
1374 Ok((
1375 HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1376 resp.modified_compaction_groups,
1377 ))
1378 }
1379
1380 pub async fn list_version_deltas(
1381 &self,
1382 start_id: HummockVersionId,
1383 num_limit: u32,
1384 committed_epoch_limit: HummockEpoch,
1385 ) -> Result<Vec<HummockVersionDelta>> {
1386 let req = ListVersionDeltasRequest {
1387 start_id,
1388 num_limit,
1389 committed_epoch_limit,
1390 };
1391 Ok(self
1392 .inner
1393 .list_version_deltas(req)
1394 .await?
1395 .version_deltas
1396 .unwrap()
1397 .version_deltas
1398 .iter()
1399 .map(HummockVersionDelta::from_rpc_protobuf)
1400 .collect())
1401 }
1402
1403 pub async fn trigger_compaction_deterministic(
1404 &self,
1405 version_id: HummockVersionId,
1406 compaction_groups: Vec<CompactionGroupId>,
1407 ) -> Result<()> {
1408 let req = TriggerCompactionDeterministicRequest {
1409 version_id,
1410 compaction_groups,
1411 };
1412 self.inner.trigger_compaction_deterministic(req).await?;
1413 Ok(())
1414 }
1415
1416 pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1417 let req = DisableCommitEpochRequest {};
1418 Ok(HummockVersion::from_rpc_protobuf(
1419 &self
1420 .inner
1421 .disable_commit_epoch(req)
1422 .await?
1423 .current_version
1424 .unwrap(),
1425 ))
1426 }
1427
1428 pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1429 let req = GetAssignedCompactTaskNumRequest {};
1430 let resp = self.inner.get_assigned_compact_task_num(req).await?;
1431 Ok(resp.num_tasks as usize)
1432 }
1433
1434 pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1435 let req = RiseCtlListCompactionGroupRequest {};
1436 let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1437 Ok(resp.compaction_groups)
1438 }
1439
1440 pub async fn risectl_update_compaction_config(
1441 &self,
1442 compaction_groups: &[CompactionGroupId],
1443 configs: &[MutableConfig],
1444 ) -> Result<()> {
1445 let req = RiseCtlUpdateCompactionConfigRequest {
1446 compaction_group_ids: compaction_groups.to_vec(),
1447 configs: configs
1448 .iter()
1449 .map(
1450 |c| rise_ctl_update_compaction_config_request::MutableConfig {
1451 mutable_config: Some(c.clone()),
1452 },
1453 )
1454 .collect(),
1455 };
1456 let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1457 Ok(())
1458 }
1459
1460 pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1461 let req = BackupMetaRequest { remarks };
1462 let resp = self.inner.backup_meta(req).await?;
1463 Ok(resp.job_id)
1464 }
1465
1466 pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1467 let req = GetBackupJobStatusRequest { job_id };
1468 let resp = self.inner.get_backup_job_status(req).await?;
1469 Ok((resp.job_status(), resp.message))
1470 }
1471
1472 pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1473 let req = DeleteMetaSnapshotRequest {
1474 snapshot_ids: snapshot_ids.to_vec(),
1475 };
1476 let _resp = self.inner.delete_meta_snapshot(req).await?;
1477 Ok(())
1478 }
1479
1480 pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1481 let req = GetMetaSnapshotManifestRequest {};
1482 let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1483 Ok(resp.manifest.expect("should exist"))
1484 }
1485
1486 pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1487 let req = GetTelemetryInfoRequest {};
1488 let resp = self.inner.get_telemetry_info(req).await?;
1489 Ok(resp)
1490 }
1491
1492 pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1493 let req = GetMetaStoreInfoRequest {};
1494 let resp = self.inner.get_meta_store_info(req).await?;
1495 Ok(resp.meta_store_endpoint)
1496 }
1497
1498 pub async fn alter_sink_props(
1499 &self,
1500 sink_id: SinkId,
1501 changed_props: BTreeMap<String, String>,
1502 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1503 connector_conn_ref: Option<ConnectionId>,
1504 ) -> Result<()> {
1505 let req = AlterConnectorPropsRequest {
1506 object_id: sink_id.as_raw_id(),
1507 changed_props: changed_props.into_iter().collect(),
1508 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1509 connector_conn_ref,
1510 object_type: AlterConnectorPropsObject::Sink as i32,
1511 extra_options: None,
1512 };
1513 let _resp = self.inner.alter_connector_props(req).await?;
1514 Ok(())
1515 }
1516
1517 pub async fn alter_iceberg_table_props(
1518 &self,
1519 table_id: TableId,
1520 sink_id: SinkId,
1521 source_id: SourceId,
1522 changed_props: BTreeMap<String, String>,
1523 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1524 connector_conn_ref: Option<ConnectionId>,
1525 ) -> Result<()> {
1526 let req = AlterConnectorPropsRequest {
1527 object_id: table_id.as_raw_id(),
1528 changed_props: changed_props.into_iter().collect(),
1529 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1530 connector_conn_ref,
1531 object_type: AlterConnectorPropsObject::IcebergTable as i32,
1532 extra_options: Some(ExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds {
1533 sink_id,
1534 source_id,
1535 })),
1536 };
1537 let _resp = self.inner.alter_connector_props(req).await?;
1538 Ok(())
1539 }
1540
1541 pub async fn alter_source_connector_props(
1542 &self,
1543 source_id: SourceId,
1544 changed_props: BTreeMap<String, String>,
1545 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1546 connector_conn_ref: Option<ConnectionId>,
1547 ) -> Result<()> {
1548 let req = AlterConnectorPropsRequest {
1549 object_id: source_id.as_raw_id(),
1550 changed_props: changed_props.into_iter().collect(),
1551 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1552 connector_conn_ref,
1553 object_type: AlterConnectorPropsObject::Source as i32,
1554 extra_options: None,
1555 };
1556 let _resp = self.inner.alter_connector_props(req).await?;
1557 Ok(())
1558 }
1559
1560 pub async fn alter_connection_connector_props(
1561 &self,
1562 connection_id: u32,
1563 changed_props: BTreeMap<String, String>,
1564 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1565 ) -> Result<()> {
1566 let req = AlterConnectorPropsRequest {
1567 object_id: connection_id,
1568 changed_props: changed_props.into_iter().collect(),
1569 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1570 connector_conn_ref: None, object_type: AlterConnectorPropsObject::Connection as i32,
1572 extra_options: None,
1573 };
1574 let _resp = self.inner.alter_connector_props(req).await?;
1575 Ok(())
1576 }
1577
1578 pub async fn alter_source_properties_safe(
1581 &self,
1582 source_id: SourceId,
1583 changed_props: BTreeMap<String, String>,
1584 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1585 reset_splits: bool,
1586 ) -> Result<()> {
1587 let req = AlterSourcePropertiesSafeRequest {
1588 source_id: source_id.as_raw_id(),
1589 changed_props: changed_props.into_iter().collect(),
1590 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1591 options: Some(PropertyUpdateOptions { reset_splits }),
1592 };
1593 let _resp = self.inner.alter_source_properties_safe(req).await?;
1594 Ok(())
1595 }
1596
1597 pub async fn reset_source_splits(&self, source_id: SourceId) -> Result<()> {
1600 let req = ResetSourceSplitsRequest {
1601 source_id: source_id.as_raw_id(),
1602 };
1603 let _resp = self.inner.reset_source_splits(req).await?;
1604 Ok(())
1605 }
1606
1607 pub async fn inject_source_offsets(
1610 &self,
1611 source_id: SourceId,
1612 split_offsets: HashMap<String, String>,
1613 ) -> Result<Vec<String>> {
1614 let req = InjectSourceOffsetsRequest {
1615 source_id: source_id.as_raw_id(),
1616 split_offsets,
1617 };
1618 let resp = self.inner.inject_source_offsets(req).await?;
1619 Ok(resp.applied_split_ids)
1620 }
1621
1622 pub async fn set_system_param(
1623 &self,
1624 param: String,
1625 value: Option<String>,
1626 ) -> Result<Option<SystemParamsReader>> {
1627 let req = SetSystemParamRequest { param, value };
1628 let resp = self.inner.set_system_param(req).await?;
1629 Ok(resp.params.map(SystemParamsReader::from))
1630 }
1631
1632 pub async fn get_session_params(&self) -> Result<String> {
1633 let req = GetSessionParamsRequest {};
1634 let resp = self.inner.get_session_params(req).await?;
1635 Ok(resp.params)
1636 }
1637
1638 pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1639 let req = SetSessionParamRequest { param, value };
1640 let resp = self.inner.set_session_param(req).await?;
1641 Ok(resp.param)
1642 }
1643
1644 pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1645 let req = GetDdlProgressRequest {};
1646 let resp = self.inner.get_ddl_progress(req).await?;
1647 Ok(resp.ddl_progress)
1648 }
1649
1650 pub async fn split_compaction_group(
1651 &self,
1652 group_id: CompactionGroupId,
1653 table_ids_to_new_group: &[TableId],
1654 partition_vnode_count: u32,
1655 ) -> Result<CompactionGroupId> {
1656 let req = SplitCompactionGroupRequest {
1657 group_id,
1658 table_ids: table_ids_to_new_group.to_vec(),
1659 partition_vnode_count,
1660 };
1661 let resp = self.inner.split_compaction_group(req).await?;
1662 Ok(resp.new_group_id)
1663 }
1664
1665 pub async fn get_tables(
1666 &self,
1667 table_ids: Vec<TableId>,
1668 include_dropped_tables: bool,
1669 ) -> Result<HashMap<TableId, Table>> {
1670 let req = GetTablesRequest {
1671 table_ids,
1672 include_dropped_tables,
1673 };
1674 let resp = self.inner.get_tables(req).await?;
1675 Ok(resp.tables)
1676 }
1677
1678 pub async fn list_serving_vnode_mappings(
1679 &self,
1680 ) -> Result<HashMap<FragmentId, (JobId, WorkerSlotMapping)>> {
1681 let req = GetServingVnodeMappingsRequest {};
1682 let resp = self.inner.get_serving_vnode_mappings(req).await?;
1683 let mappings = resp
1684 .worker_slot_mappings
1685 .into_iter()
1686 .map(|p| {
1687 (
1688 p.fragment_id,
1689 (
1690 resp.fragment_to_table
1691 .get(&p.fragment_id)
1692 .cloned()
1693 .unwrap_or_default(),
1694 WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1695 ),
1696 )
1697 })
1698 .collect();
1699 Ok(mappings)
1700 }
1701
1702 pub async fn risectl_list_compaction_status(
1703 &self,
1704 ) -> Result<(
1705 Vec<CompactStatus>,
1706 Vec<CompactTaskAssignment>,
1707 Vec<CompactTaskProgress>,
1708 )> {
1709 let req = RiseCtlListCompactionStatusRequest {};
1710 let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1711 Ok((
1712 resp.compaction_statuses,
1713 resp.task_assignment,
1714 resp.task_progress,
1715 ))
1716 }
1717
1718 pub async fn get_compaction_score(
1719 &self,
1720 compaction_group_id: CompactionGroupId,
1721 ) -> Result<Vec<PickerInfo>> {
1722 let req = GetCompactionScoreRequest {
1723 compaction_group_id,
1724 };
1725 let resp = self.inner.get_compaction_score(req).await?;
1726 Ok(resp.scores)
1727 }
1728
1729 pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1730 let req = RiseCtlRebuildTableStatsRequest {};
1731 let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1732 Ok(())
1733 }
1734
1735 pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1736 let req = ListBranchedObjectRequest {};
1737 let resp = self.inner.list_branched_object(req).await?;
1738 Ok(resp.branched_objects)
1739 }
1740
1741 pub async fn list_active_write_limit(&self) -> Result<HashMap<CompactionGroupId, WriteLimit>> {
1742 let req = ListActiveWriteLimitRequest {};
1743 let resp = self.inner.list_active_write_limit(req).await?;
1744 Ok(resp.write_limits)
1745 }
1746
1747 pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1748 let req = ListHummockMetaConfigRequest {};
1749 let resp = self.inner.list_hummock_meta_config(req).await?;
1750 Ok(resp.configs)
1751 }
1752
1753 pub async fn get_table_change_logs(
1754 &self,
1755 epoch_only: bool,
1756 start_epoch_inclusive: Option<u64>,
1757 end_epoch_inclusive: Option<u64>,
1758 table_ids: Option<HashSet<TableId>>,
1759 exclude_empty: bool,
1760 limit: Option<u32>,
1761 ) -> Result<TableChangeLogs> {
1762 let req = GetTableChangeLogsRequest {
1763 epoch_only,
1764 start_epoch_inclusive,
1765 end_epoch_inclusive,
1766 table_ids: table_ids.map(|iter| PbTableFilter {
1767 table_ids: iter.into_iter().collect::<Vec<_>>(),
1768 }),
1769 exclude_empty,
1770 limit,
1771 };
1772 let resp = self.inner.get_table_change_logs(req).await?;
1773 Ok(resp
1774 .table_change_logs
1775 .into_iter()
1776 .map(|(id, change_log)| (TableId::new(id), TableChangeLog::from_protobuf(&change_log)))
1777 .collect())
1778 }
1779
1780 pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1781 let _resp = self
1782 .inner
1783 .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1784 .await?;
1785
1786 Ok(())
1787 }
1788
1789 pub async fn rw_cloud_validate_source(
1790 &self,
1791 source_type: SourceType,
1792 source_config: HashMap<String, String>,
1793 ) -> Result<RwCloudValidateSourceResponse> {
1794 let req = RwCloudValidateSourceRequest {
1795 source_type: source_type.into(),
1796 source_config,
1797 };
1798 let resp = self.inner.rw_cloud_validate_source(req).await?;
1799 Ok(resp)
1800 }
1801
1802 pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1803 self.inner.core.read().await.sink_coordinate_client.clone()
1804 }
1805
1806 pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1807 let req = ListCompactTaskAssignmentRequest {};
1808 let resp = self.inner.list_compact_task_assignment(req).await?;
1809 Ok(resp.task_assignment)
1810 }
1811
1812 pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1813 let req = ListEventLogRequest::default();
1814 let resp = self.inner.list_event_log(req).await?;
1815 Ok(resp.event_logs)
1816 }
1817
1818 pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1819 let req = ListCompactTaskProgressRequest {};
1820 let resp = self.inner.list_compact_task_progress(req).await?;
1821 Ok(resp.task_progress)
1822 }
1823
1824 #[cfg(madsim)]
1825 pub fn try_add_panic_event_blocking(
1826 &self,
1827 panic_info: impl Display,
1828 timeout_millis: Option<u64>,
1829 ) {
1830 }
1831
1832 #[cfg(not(madsim))]
1834 pub fn try_add_panic_event_blocking(
1835 &self,
1836 panic_info: impl Display,
1837 timeout_millis: Option<u64>,
1838 ) {
1839 let event = event_log::EventWorkerNodePanic {
1840 worker_id: self.worker_id,
1841 worker_type: self.worker_type.into(),
1842 host_addr: Some(self.host_addr.to_protobuf()),
1843 panic_info: format!("{panic_info}"),
1844 };
1845 let grpc_meta_client = self.inner.clone();
1846 let _ = thread::spawn(move || {
1847 let rt = tokio::runtime::Builder::new_current_thread()
1848 .enable_all()
1849 .build()
1850 .unwrap();
1851 let req = AddEventLogRequest {
1852 event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1853 };
1854 rt.block_on(async {
1855 let _ = tokio::time::timeout(
1856 Duration::from_millis(timeout_millis.unwrap_or(1000)),
1857 grpc_meta_client.add_event_log(req),
1858 )
1859 .await;
1860 });
1861 })
1862 .join();
1863 }
1864
1865 pub async fn add_sink_fail_evet(
1866 &self,
1867 sink_id: SinkId,
1868 sink_name: String,
1869 connector: String,
1870 error: String,
1871 ) -> Result<()> {
1872 let event = event_log::EventSinkFail {
1873 sink_id,
1874 sink_name,
1875 connector,
1876 error,
1877 };
1878 let req = AddEventLogRequest {
1879 event: Some(add_event_log_request::Event::SinkFail(event)),
1880 };
1881 self.inner.add_event_log(req).await?;
1882 Ok(())
1883 }
1884
1885 pub async fn add_cdc_auto_schema_change_fail_event(
1886 &self,
1887 source_id: SourceId,
1888 table_name: String,
1889 cdc_table_id: String,
1890 upstream_ddl: String,
1891 fail_info: String,
1892 ) -> Result<()> {
1893 let event = event_log::EventAutoSchemaChangeFail {
1894 table_id: source_id.as_cdc_table_id(),
1895 table_name,
1896 cdc_table_id,
1897 upstream_ddl,
1898 fail_info,
1899 };
1900 let req = AddEventLogRequest {
1901 event: Some(add_event_log_request::Event::AutoSchemaChangeFail(event)),
1902 };
1903 self.inner.add_event_log(req).await?;
1904 Ok(())
1905 }
1906
1907 pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1908 let req = CancelCompactTaskRequest {
1909 task_id,
1910 task_status: task_status as _,
1911 };
1912 let resp = self.inner.cancel_compact_task(req).await?;
1913 Ok(resp.ret)
1914 }
1915
1916 pub async fn get_version_by_epoch(
1917 &self,
1918 epoch: HummockEpoch,
1919 table_id: TableId,
1920 ) -> Result<PbHummockVersion> {
1921 let req = GetVersionByEpochRequest { epoch, table_id };
1922 let resp = self.inner.get_version_by_epoch(req).await?;
1923 Ok(resp.version.unwrap())
1924 }
1925
1926 pub async fn get_cluster_limits(
1927 &self,
1928 ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1929 let req = GetClusterLimitsRequest {};
1930 let resp = self.inner.get_cluster_limits(req).await?;
1931 Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1932 }
1933
1934 pub async fn merge_compaction_group(
1935 &self,
1936 left_group_id: CompactionGroupId,
1937 right_group_id: CompactionGroupId,
1938 ) -> Result<()> {
1939 let req = MergeCompactionGroupRequest {
1940 left_group_id,
1941 right_group_id,
1942 };
1943 self.inner.merge_compaction_group(req).await?;
1944 Ok(())
1945 }
1946
1947 pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1949 let request = ListRateLimitsRequest {};
1950 let resp = self.inner.list_rate_limits(request).await?;
1951 Ok(resp.rate_limits)
1952 }
1953
1954 pub async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
1955 let request = ListCdcProgressRequest {};
1956 let resp = self.inner.list_cdc_progress(request).await?;
1957 Ok(resp.cdc_progress)
1958 }
1959
1960 pub async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>> {
1961 let request = ListRefreshTableStatesRequest {};
1962 let resp = self.inner.list_refresh_table_states(request).await?;
1963 Ok(resp.states)
1964 }
1965
1966 pub async fn list_iceberg_compaction_status(
1967 &self,
1968 ) -> Result<Vec<list_iceberg_compaction_status_response::IcebergCompactionStatus>> {
1969 let request = ListIcebergCompactionStatusRequest {};
1970 let resp = self.inner.list_iceberg_compaction_status(request).await?;
1971 Ok(resp.statuses)
1972 }
1973
1974 pub async fn list_sink_log_store_tables(
1975 &self,
1976 ) -> Result<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
1977 let request = ListSinkLogStoreTablesRequest {};
1978 let resp = self.inner.list_sink_log_store_tables(request).await?;
1979 Ok(resp.tables)
1980 }
1981
1982 pub async fn create_iceberg_table(
1983 &self,
1984 table_job_info: PbTableJobInfo,
1985 sink_job_info: PbSinkJobInfo,
1986 iceberg_source: PbSource,
1987 if_not_exists: bool,
1988 ) -> Result<WaitVersion> {
1989 let request = CreateIcebergTableRequest {
1990 table_info: Some(table_job_info),
1991 sink_info: Some(sink_job_info),
1992 iceberg_source: Some(iceberg_source),
1993 if_not_exists,
1994 };
1995
1996 let resp = Box::pin(self.inner.create_iceberg_table(request)).await?;
1997 Ok(resp
1998 .version
1999 .ok_or_else(|| anyhow!("wait version not set"))?)
2000 }
2001
2002 pub async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
2003 let request = ListIcebergTablesRequest {};
2004 let resp = self.inner.list_iceberg_tables(request).await?;
2005 Ok(resp.iceberg_tables)
2006 }
2007
2008 pub async fn get_cluster_stack_trace(
2010 &self,
2011 actor_traces_format: ActorTracesFormat,
2012 ) -> Result<StackTraceResponse> {
2013 let request = StackTraceRequest {
2014 actor_traces_format: actor_traces_format as i32,
2015 };
2016 let resp = self.inner.stack_trace(request).await?;
2017 Ok(resp)
2018 }
2019
2020 pub async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
2021 let request = SetSyncLogStoreAlignedRequest { job_id, aligned };
2022 self.inner.set_sync_log_store_aligned(request).await?;
2023 Ok(())
2024 }
2025
2026 pub async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
2027 self.inner.refresh(request).await
2028 }
2029
2030 pub async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
2031 let request = ListUnmigratedTablesRequest {};
2032 let resp = self.inner.list_unmigrated_tables(request).await?;
2033 Ok(resp
2034 .tables
2035 .into_iter()
2036 .map(|table| (table.table_id, table.table_name))
2037 .collect())
2038 }
2039}
2040
2041#[async_trait]
2042impl HummockMetaClient for MetaClient {
2043 async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
2044 let req = UnpinVersionBeforeRequest {
2045 context_id: self.worker_id(),
2046 unpin_version_before,
2047 };
2048 self.inner.unpin_version_before(req).await?;
2049 Ok(())
2050 }
2051
2052 async fn get_current_version(&self) -> Result<HummockVersion> {
2053 let req = GetCurrentVersionRequest::default();
2054 Ok(HummockVersion::from_rpc_protobuf(
2055 &self
2056 .inner
2057 .get_current_version(req)
2058 .await?
2059 .current_version
2060 .unwrap(),
2061 ))
2062 }
2063
2064 async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
2065 let resp = self
2066 .inner
2067 .get_new_object_ids(GetNewObjectIdsRequest { number })
2068 .await?;
2069 Ok(ObjectIdRange::new(resp.start_id, resp.end_id))
2070 }
2071
2072 async fn commit_epoch_with_change_log(
2073 &self,
2074 _epoch: HummockEpoch,
2075 _sync_result: SyncResult,
2076 _change_log_info: Option<HummockMetaClientChangeLogInfo>,
2077 ) -> Result<()> {
2078 panic!("Only meta service can commit_epoch in production.")
2079 }
2080
2081 async fn trigger_manual_compaction(
2082 &self,
2083 compaction_group_id: CompactionGroupId,
2084 table_id: JobId,
2085 level: u32,
2086 sst_ids: Vec<HummockSstableId>,
2087 exclusive: bool,
2088 ) -> Result<bool> {
2089 let req = TriggerManualCompactionRequest {
2091 compaction_group_id,
2092 table_id,
2093 level,
2096 sst_ids,
2097 exclusive: Some(exclusive),
2098 ..Default::default()
2099 };
2100
2101 let resp = self.inner.trigger_manual_compaction(req).await?;
2102 Ok(resp.should_retry.unwrap_or(false))
2103 }
2104
2105 async fn trigger_full_gc(
2106 &self,
2107 sst_retention_time_sec: u64,
2108 prefix: Option<String>,
2109 ) -> Result<()> {
2110 self.inner
2111 .trigger_full_gc(TriggerFullGcRequest {
2112 sst_retention_time_sec,
2113 prefix,
2114 })
2115 .await?;
2116 Ok(())
2117 }
2118
2119 async fn subscribe_compaction_event(
2120 &self,
2121 ) -> Result<(
2122 UnboundedSender<SubscribeCompactionEventRequest>,
2123 BoxStream<'static, CompactionEventItem>,
2124 )> {
2125 let (request_sender, request_receiver) =
2126 unbounded_channel::<SubscribeCompactionEventRequest>();
2127 request_sender
2128 .send(SubscribeCompactionEventRequest {
2129 event: Some(subscribe_compaction_event_request::Event::Register(
2130 Register {
2131 context_id: self.worker_id(),
2132 },
2133 )),
2134 create_at: SystemTime::now()
2135 .duration_since(SystemTime::UNIX_EPOCH)
2136 .expect("Clock may have gone backwards")
2137 .as_millis() as u64,
2138 })
2139 .context("Failed to subscribe compaction event")?;
2140
2141 let stream = self
2142 .inner
2143 .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
2144 request_receiver,
2145 )))
2146 .await?;
2147
2148 Ok((request_sender, Box::pin(stream)))
2149 }
2150
2151 async fn get_version_by_epoch(
2152 &self,
2153 epoch: HummockEpoch,
2154 table_id: TableId,
2155 ) -> Result<PbHummockVersion> {
2156 self.get_version_by_epoch(epoch, table_id).await
2157 }
2158
2159 async fn subscribe_iceberg_compaction_event(
2160 &self,
2161 ) -> Result<(
2162 UnboundedSender<SubscribeIcebergCompactionEventRequest>,
2163 BoxStream<'static, IcebergCompactionEventItem>,
2164 )> {
2165 let (request_sender, request_receiver) =
2166 unbounded_channel::<SubscribeIcebergCompactionEventRequest>();
2167 request_sender
2168 .send(SubscribeIcebergCompactionEventRequest {
2169 event: Some(subscribe_iceberg_compaction_event_request::Event::Register(
2170 IcebergRegister {
2171 context_id: self.worker_id(),
2172 },
2173 )),
2174 create_at: SystemTime::now()
2175 .duration_since(SystemTime::UNIX_EPOCH)
2176 .expect("Clock may have gone backwards")
2177 .as_millis() as u64,
2178 })
2179 .context("Failed to subscribe compaction event")?;
2180
2181 let stream = self
2182 .inner
2183 .subscribe_iceberg_compaction_event(Request::new(UnboundedReceiverStream::new(
2184 request_receiver,
2185 )))
2186 .await?;
2187
2188 Ok((request_sender, Box::pin(stream)))
2189 }
2190
2191 async fn get_table_change_logs(
2192 &self,
2193 epoch_only: bool,
2194 start_epoch_inclusive: Option<u64>,
2195 end_epoch_inclusive: Option<u64>,
2196 table_ids: Option<HashSet<TableId>>,
2197 exclude_empty: bool,
2198 limit: Option<u32>,
2199 ) -> Result<TableChangeLogs> {
2200 self.get_table_change_logs(
2201 epoch_only,
2202 start_epoch_inclusive,
2203 end_epoch_inclusive,
2204 table_ids,
2205 exclude_empty,
2206 limit,
2207 )
2208 .await
2209 }
2210}
2211
2212#[async_trait]
2213impl TelemetryInfoFetcher for MetaClient {
2214 async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
2215 let resp = self
2216 .get_telemetry_info()
2217 .await
2218 .map_err(|e| e.to_report_string())?;
2219 let tracking_id = resp.get_tracking_id().ok();
2220 Ok(tracking_id.map(|id| id.to_owned()))
2221 }
2222}
2223
2224pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
2225
2226#[derive(Debug, Clone)]
2227struct GrpcMetaClientCore {
2228 cluster_client: ClusterServiceClient<Channel>,
2229 meta_member_client: MetaMemberServiceClient<Channel>,
2230 heartbeat_client: HeartbeatServiceClient<Channel>,
2231 ddl_client: DdlServiceClient<Channel>,
2232 hummock_client: HummockManagerServiceClient<Channel>,
2233 notification_client: NotificationServiceClient<Channel>,
2234 stream_client: StreamManagerServiceClient<Channel>,
2235 user_client: UserServiceClient<Channel>,
2236 scale_client: ScaleServiceClient<Channel>,
2237 backup_client: BackupServiceClient<Channel>,
2238 telemetry_client: TelemetryInfoServiceClient<Channel>,
2239 system_params_client: SystemParamsServiceClient<Channel>,
2240 session_params_client: SessionParamServiceClient<Channel>,
2241 serving_client: ServingServiceClient<Channel>,
2242 cloud_client: CloudServiceClient<Channel>,
2243 sink_coordinate_client: SinkCoordinationRpcClient,
2244 event_log_client: EventLogServiceClient<Channel>,
2245 cluster_limit_client: ClusterLimitServiceClient<Channel>,
2246 hosted_iceberg_catalog_service_client: HostedIcebergCatalogServiceClient<Channel>,
2247 monitor_client: MonitorServiceClient<Channel>,
2248}
2249
2250impl GrpcMetaClientCore {
2251 pub(crate) fn new(channel: Channel) -> Self {
2252 let cluster_client = ClusterServiceClient::new(channel.clone());
2253 let meta_member_client = MetaMemberClient::new(channel.clone());
2254 let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
2255 let ddl_client =
2256 DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2257 let hummock_client =
2258 HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2259 let notification_client =
2260 NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2261 let stream_client =
2262 StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2263 let user_client = UserServiceClient::new(channel.clone());
2264 let scale_client =
2265 ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2266 let backup_client = BackupServiceClient::new(channel.clone());
2267 let telemetry_client =
2268 TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2269 let system_params_client = SystemParamsServiceClient::new(channel.clone());
2270 let session_params_client = SessionParamServiceClient::new(channel.clone());
2271 let serving_client = ServingServiceClient::new(channel.clone());
2272 let cloud_client = CloudServiceClient::new(channel.clone());
2273 let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone())
2274 .max_decoding_message_size(usize::MAX);
2275 let event_log_client = EventLogServiceClient::new(channel.clone());
2276 let cluster_limit_client = ClusterLimitServiceClient::new(channel.clone());
2277 let hosted_iceberg_catalog_service_client =
2278 HostedIcebergCatalogServiceClient::new(channel.clone());
2279 let monitor_client = configured_monitor_service_client(MonitorServiceClient::new(channel));
2280
2281 GrpcMetaClientCore {
2282 cluster_client,
2283 meta_member_client,
2284 heartbeat_client,
2285 ddl_client,
2286 hummock_client,
2287 notification_client,
2288 stream_client,
2289 user_client,
2290 scale_client,
2291 backup_client,
2292 telemetry_client,
2293 system_params_client,
2294 session_params_client,
2295 serving_client,
2296 cloud_client,
2297 sink_coordinate_client,
2298 event_log_client,
2299 cluster_limit_client,
2300 hosted_iceberg_catalog_service_client,
2301 monitor_client,
2302 }
2303 }
2304}
2305
2306#[derive(Debug, Clone)]
2310struct GrpcMetaClient {
2311 member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
2312 core: Arc<RwLock<GrpcMetaClientCore>>,
2313}
2314
2315type MetaMemberClient = MetaMemberServiceClient<Channel>;
2316
2317struct MetaMemberGroup {
2318 members: LruCache<http::Uri, Option<MetaMemberClient>>,
2319}
2320
2321struct MetaMemberManagement {
2322 core_ref: Arc<RwLock<GrpcMetaClientCore>>,
2323 members: Either<MetaMemberClient, MetaMemberGroup>,
2324 current_leader: http::Uri,
2325 meta_config: Arc<MetaConfig>,
2326}
2327
2328impl MetaMemberManagement {
2329 const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
2330
2331 fn host_address_to_uri(addr: HostAddress) -> http::Uri {
2332 format!("http://{}:{}", addr.host, addr.port)
2333 .parse()
2334 .unwrap()
2335 }
2336
2337 async fn recreate_core(&self, channel: Channel) {
2338 let mut core = self.core_ref.write().await;
2339 *core = GrpcMetaClientCore::new(channel);
2340 }
2341
2342 async fn refresh_members(&mut self) -> Result<()> {
2343 let leader_addr = match self.members.as_mut() {
2344 Either::Left(client) => {
2345 let resp = client
2346 .to_owned()
2347 .members(MembersRequest {})
2348 .await
2349 .map_err(RpcError::from_meta_status)?;
2350 let resp = resp.into_inner();
2351 resp.members.into_iter().find(|member| member.is_leader)
2352 }
2353 Either::Right(member_group) => {
2354 let mut fetched_members = None;
2355
2356 for (addr, client) in &mut member_group.members {
2357 let members: Result<_> = try {
2358 let mut client = match client {
2359 Some(cached_client) => cached_client.to_owned(),
2360 None => {
2361 let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
2362 let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
2363 .await
2364 .context("failed to create client")?;
2365 let new_client: MetaMemberClient =
2366 MetaMemberServiceClient::new(channel);
2367 *client = Some(new_client.clone());
2368
2369 new_client
2370 }
2371 };
2372
2373 let resp = client
2374 .members(MembersRequest {})
2375 .await
2376 .context("failed to fetch members")?;
2377
2378 resp.into_inner().members
2379 };
2380
2381 let fetched = members.is_ok();
2382 fetched_members = Some(members);
2383 if fetched {
2384 break;
2385 }
2386 }
2387
2388 let members = fetched_members
2389 .context("no member available in the list")?
2390 .context("could not refresh members")?;
2391
2392 let mut leader = None;
2394 for member in members {
2395 if member.is_leader {
2396 leader = Some(member.clone());
2397 }
2398
2399 let addr = Self::host_address_to_uri(member.address.unwrap());
2400 if !member_group.members.contains(&addr) {
2402 tracing::info!("new meta member joined: {}", addr);
2403 member_group.members.put(addr, None);
2404 }
2405 }
2406
2407 leader
2408 }
2409 };
2410
2411 if let Some(leader) = leader_addr {
2412 let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
2413
2414 if discovered_leader != self.current_leader {
2415 tracing::info!("new meta leader {} discovered", discovered_leader);
2416
2417 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
2418 Duration::from_secs(self.meta_config.meta_leader_lease_secs),
2419 false,
2420 );
2421
2422 let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
2423 let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
2424 GrpcMetaClient::connect_to_endpoint(endpoint).await
2425 })
2426 .await?;
2427
2428 self.recreate_core(channel).await;
2429 self.current_leader = discovered_leader;
2430 }
2431 }
2432
2433 Ok(())
2434 }
2435}
2436
2437impl GrpcMetaClient {
2438 const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
2440 const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
2442 const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
2444 const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
2446
2447 fn start_meta_member_monitor(
2448 &self,
2449 init_leader_addr: http::Uri,
2450 members: Either<MetaMemberClient, MetaMemberGroup>,
2451 force_refresh_receiver: Receiver<Sender<Result<()>>>,
2452 meta_config: Arc<MetaConfig>,
2453 ) -> Result<()> {
2454 let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
2455 let current_leader = init_leader_addr;
2456
2457 let enable_period_tick = matches!(members, Either::Right(_));
2458
2459 let member_management = MetaMemberManagement {
2460 core_ref,
2461 members,
2462 current_leader,
2463 meta_config,
2464 };
2465
2466 let mut force_refresh_receiver = force_refresh_receiver;
2467
2468 tokio::spawn(async move {
2469 let mut member_management = member_management;
2470 let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
2471
2472 loop {
2473 let event: Option<Sender<Result<()>>> = if enable_period_tick {
2474 tokio::select! {
2475 _ = ticker.tick() => None,
2476 result_sender = force_refresh_receiver.recv() => {
2477 if result_sender.is_none() {
2478 break;
2479 }
2480
2481 result_sender
2482 },
2483 }
2484 } else {
2485 let result_sender = force_refresh_receiver.recv().await;
2486
2487 if result_sender.is_none() {
2488 break;
2489 }
2490
2491 result_sender
2492 };
2493
2494 let tick_result = member_management.refresh_members().await;
2495 if let Err(e) = tick_result.as_ref() {
2496 tracing::warn!(error = %e.as_report(), "refresh meta member client failed");
2497 }
2498
2499 if let Some(sender) = event {
2500 let _resp = sender.send(tick_result);
2502 }
2503 }
2504 });
2505
2506 Ok(())
2507 }
2508
2509 async fn force_refresh_leader(&self) -> Result<()> {
2510 let (sender, receiver) = oneshot::channel();
2511
2512 self.member_monitor_event_sender
2513 .send(sender)
2514 .await
2515 .map_err(|e| anyhow!(e))?;
2516
2517 receiver.await.map_err(|e| anyhow!(e))?
2518 }
2519
2520 pub async fn new(strategy: &MetaAddressStrategy, config: Arc<MetaConfig>) -> Result<Self> {
2522 let (channel, addr) = match strategy {
2523 MetaAddressStrategy::LoadBalance(addr) => {
2524 Self::try_build_rpc_channel(vec![addr.clone()]).await
2525 }
2526 MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
2527 }?;
2528 let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
2529 let client = GrpcMetaClient {
2530 member_monitor_event_sender: force_refresh_sender,
2531 core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
2532 };
2533
2534 let meta_member_client = client.core.read().await.meta_member_client.clone();
2535 let members = match strategy {
2536 MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
2537 MetaAddressStrategy::List(addrs) => {
2538 let mut members = LruCache::new(20.try_into().unwrap());
2539 for addr in addrs {
2540 members.put(addr.clone(), None);
2541 }
2542 members.put(addr.clone(), Some(meta_member_client));
2543
2544 Either::Right(MetaMemberGroup { members })
2545 }
2546 };
2547
2548 client.start_meta_member_monitor(addr, members, force_refresh_receiver, config.clone())?;
2549
2550 client.force_refresh_leader().await?;
2551
2552 Ok(client)
2553 }
2554
2555 fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2556 Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2557 }
2558
2559 pub(crate) async fn try_build_rpc_channel(
2560 addrs: impl IntoIterator<Item = http::Uri>,
2561 ) -> Result<(Channel, http::Uri)> {
2562 let endpoints: Vec<_> = addrs
2563 .into_iter()
2564 .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2565 .collect();
2566
2567 let mut last_error = None;
2568
2569 for (endpoint, addr) in endpoints {
2570 match Self::connect_to_endpoint(endpoint).await {
2571 Ok(channel) => {
2572 tracing::info!("Connect to meta server {} successfully", addr);
2573 return Ok((channel, addr));
2574 }
2575 Err(e) => {
2576 tracing::warn!(
2577 error = %e.as_report(),
2578 "Failed to connect to meta server {}, trying again",
2579 addr,
2580 );
2581 last_error = Some(e);
2582 }
2583 }
2584 }
2585
2586 if let Some(last_error) = last_error {
2587 Err(anyhow::anyhow!(last_error)
2588 .context("failed to connect to all meta servers")
2589 .into())
2590 } else {
2591 bail!("no meta server address provided")
2592 }
2593 }
2594
2595 async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2596 let channel = endpoint
2597 .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2598 .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2599 .connect_timeout(Duration::from_secs(5))
2600 .monitored_connect("grpc-meta-client", Default::default())
2601 .await?
2602 .wrapped();
2603
2604 Ok(channel)
2605 }
2606
2607 pub(crate) fn retry_strategy_to_bound(
2608 high_bound: Duration,
2609 exceed: bool,
2610 ) -> impl Iterator<Item = Duration> {
2611 let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2612 .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2613 .map(jitter);
2614
2615 let mut sum = Duration::default();
2616
2617 iter.take_while(move |duration| {
2618 sum += *duration;
2619
2620 if exceed {
2621 sum < high_bound + *duration
2622 } else {
2623 sum < high_bound
2624 }
2625 })
2626 }
2627}
2628
2629macro_rules! for_all_meta_rpc {
2630 ($macro:ident) => {
2631 $macro! {
2632 { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2633 ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2634 ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2635 ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2636 ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2637 ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2638 ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2639 ,{ stream_client, flush, FlushRequest, FlushResponse }
2640 ,{ stream_client, pause, PauseRequest, PauseResponse }
2641 ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2642 ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2643 ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2644 ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2645 ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2646 ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2647 ,{ stream_client, list_creating_fragment_distribution, ListCreatingFragmentDistributionRequest, ListCreatingFragmentDistributionResponse }
2648 ,{ stream_client, list_sink_log_store_tables, ListSinkLogStoreTablesRequest, ListSinkLogStoreTablesResponse }
2649 ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2650 ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2651 ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2652 ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2653 ,{ stream_client, list_cdc_progress, ListCdcProgressRequest, ListCdcProgressResponse }
2654 ,{ stream_client, list_refresh_table_states, ListRefreshTableStatesRequest, ListRefreshTableStatesResponse }
2655 ,{ stream_client, list_iceberg_compaction_status, ListIcebergCompactionStatusRequest, ListIcebergCompactionStatusResponse }
2656 ,{ stream_client, alter_connector_props, AlterConnectorPropsRequest, AlterConnectorPropsResponse }
2657 ,{ stream_client, alter_source_properties_safe, AlterSourcePropertiesSafeRequest, AlterSourcePropertiesSafeResponse }
2658 ,{ stream_client, reset_source_splits, ResetSourceSplitsRequest, ResetSourceSplitsResponse }
2659 ,{ stream_client, inject_source_offsets, InjectSourceOffsetsRequest, InjectSourceOffsetsResponse }
2660 ,{ stream_client, get_fragment_by_id, GetFragmentByIdRequest, GetFragmentByIdResponse }
2661 ,{ stream_client, get_fragment_vnodes, GetFragmentVnodesRequest, GetFragmentVnodesResponse }
2662 ,{ stream_client, get_actor_vnodes, GetActorVnodesRequest, GetActorVnodesResponse }
2663 ,{ stream_client, set_sync_log_store_aligned, SetSyncLogStoreAlignedRequest, SetSyncLogStoreAlignedResponse }
2664 ,{ stream_client, refresh, RefreshRequest, RefreshResponse }
2665 ,{ stream_client, list_unmigrated_tables, ListUnmigratedTablesRequest, ListUnmigratedTablesResponse }
2666 ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2667 ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2668 ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2669 ,{ ddl_client, alter_subscription_retention, AlterSubscriptionRetentionRequest, AlterSubscriptionRetentionResponse }
2670 ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2671 ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2672 ,{ ddl_client, alter_backfill_parallelism, AlterBackfillParallelismRequest, AlterBackfillParallelismResponse }
2673 ,{ ddl_client, alter_streaming_job_config, AlterStreamingJobConfigRequest, AlterStreamingJobConfigResponse }
2674 ,{ ddl_client, alter_fragment_parallelism, AlterFragmentParallelismRequest, AlterFragmentParallelismResponse }
2675 ,{ ddl_client, alter_cdc_table_backfill_parallelism, AlterCdcTableBackfillParallelismRequest, AlterCdcTableBackfillParallelismResponse }
2676 ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2677 ,{ ddl_client, alter_database_param, AlterDatabaseParamRequest, AlterDatabaseParamResponse }
2678 ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2679 ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2680 ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2681 ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2682 ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2683 ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2684 ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2685 ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2686 ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2687 ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2688 ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2689 ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2690 ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2691 ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2692 ,{ ddl_client, reset_source, ResetSourceRequest, ResetSourceResponse }
2693 ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2694 ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2695 ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2696 ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2697 ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2698 ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2699 ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2700 ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2701 ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2702 ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2703 ,{ ddl_client, risectl_resume_backfill, RisectlResumeBackfillRequest, RisectlResumeBackfillResponse }
2704 ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2705 ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2706 ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2707 ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2708 ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2709 ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2710 ,{ ddl_client, wait, WaitRequest, WaitResponse }
2711 ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2712 ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2713 ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2714 ,{ ddl_client, compact_iceberg_table, CompactIcebergTableRequest, CompactIcebergTableResponse }
2715 ,{ ddl_client, expire_iceberg_table_snapshots, ExpireIcebergTableSnapshotsRequest, ExpireIcebergTableSnapshotsResponse }
2716 ,{ ddl_client, create_iceberg_table, CreateIcebergTableRequest, CreateIcebergTableResponse }
2717 ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2718 ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2719 ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2720 ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2721 ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2722 ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2723 ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2724 ,{ hummock_client, get_new_object_ids, GetNewObjectIdsRequest, GetNewObjectIdsResponse }
2725 ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2726 ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2727 ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2728 ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2729 ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2730 ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2731 ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2732 ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2733 ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2734 ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2735 ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2736 ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2737 ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2738 ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2739 ,{ hummock_client, subscribe_iceberg_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeIcebergCompactionEventRequest>, Streaming<SubscribeIcebergCompactionEventResponse> }
2740 ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2741 ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2742 ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2743 ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2744 ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2745 ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2746 ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2747 ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2748 ,{ hummock_client, get_table_change_logs, GetTableChangeLogsRequest, GetTableChangeLogsResponse }
2749 ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2750 ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2751 ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2752 ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2753 ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2754 ,{ user_client, alter_default_privilege, AlterDefaultPrivilegeRequest, AlterDefaultPrivilegeResponse }
2755 ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2756 ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2757 ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2758 ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2759 ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2760 ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2761 ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2762 ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2763 ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2764 ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2765 ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2766 ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2767 ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2768 ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2769 ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2770 ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2771 ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2772 ,{ hosted_iceberg_catalog_service_client, list_iceberg_tables, ListIcebergTablesRequest, ListIcebergTablesResponse }
2773 ,{ monitor_client, stack_trace, StackTraceRequest, StackTraceResponse }
2774 }
2775 };
2776}
2777
2778impl GrpcMetaClient {
2779 async fn refresh_client_if_needed(&self, code: Code) {
2780 if matches!(
2781 code,
2782 Code::Unknown | Code::Unimplemented | Code::Unavailable
2783 ) {
2784 tracing::debug!("matching tonic code {}", code);
2785 let (result_sender, result_receiver) = oneshot::channel();
2786 if self
2787 .member_monitor_event_sender
2788 .try_send(result_sender)
2789 .is_ok()
2790 {
2791 if let Ok(Err(e)) = result_receiver.await {
2792 tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2793 }
2794 } else {
2795 tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2796 }
2797 }
2798 }
2799}
2800
2801impl GrpcMetaClient {
2802 for_all_meta_rpc! { meta_rpc_client_method_impl }
2803}