1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{
33 AlterDatabaseParam, FunctionId, IndexId, ObjectId, SecretId, TableId,
34};
35use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
36use risingwave_common::hash::WorkerSlotMapping;
37use risingwave_common::id::{
38 ConnectionId, DatabaseId, JobId, SchemaId, SinkId, SubscriptionId, UserId, ViewId, WorkerId,
39};
40use risingwave_common::monitor::EndpointExt;
41use risingwave_common::system_param::reader::SystemParamsReader;
42use risingwave_common::telemetry::report::TelemetryInfoFetcher;
43use risingwave_common::util::addr::HostAddr;
44use risingwave_common::util::meta_addr::MetaAddressStrategy;
45use risingwave_common::util::resource_util::cpu::total_cpu_available;
46use risingwave_common::util::resource_util::hostname;
47use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
48use risingwave_error::bail;
49use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
50use risingwave_hummock_sdk::change_log::{TableChangeLog, TableChangeLogs};
51use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
52use risingwave_hummock_sdk::{
53 CompactionGroupId, HummockEpoch, HummockVersionId, ObjectIdRange, SyncResult,
54};
55use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
56use risingwave_pb::backup_service::*;
57use risingwave_pb::catalog::{
58 Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
59 PbSubscription, PbTable, PbView, Table,
60};
61use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
62use risingwave_pb::cloud_service::*;
63use risingwave_pb::common::worker_node::Property;
64use risingwave_pb::common::{HostAddress, OptionalUint32, OptionalUint64, WorkerNode, WorkerType};
65use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
66use risingwave_pb::ddl_service::alter_owner_request::Object;
67use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
68use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
69use risingwave_pb::ddl_service::*;
70use risingwave_pb::hummock::compact_task::TaskStatus;
71use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
72use risingwave_pb::hummock::get_table_change_logs_request::PbTableFilter;
73use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
74use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
75use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
76use risingwave_pb::hummock::write_limits::WriteLimit;
77use risingwave_pb::hummock::*;
78use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Register as IcebergRegister;
79use risingwave_pb::iceberg_compaction::{
80 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
81 subscribe_iceberg_compaction_event_request,
82};
83use risingwave_pb::id::{ActorId, FragmentId, HummockSstableId, SourceId};
84use risingwave_pb::meta::alter_connector_props_request::{
85 AlterConnectorPropsObject, AlterIcebergTableIds, ExtraOptions,
86};
87use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
88use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
89use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
90use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
91use risingwave_pb::meta::hosted_iceberg_catalog_service_client::HostedIcebergCatalogServiceClient;
92use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
93use risingwave_pb::meta::list_actor_states_response::ActorState;
94use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
95use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
96use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
97use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
98use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
99use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
100use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
101use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
102use risingwave_pb::meta::serving_service_client::ServingServiceClient;
103use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
104use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
105use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
106use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
107use risingwave_pb::meta::{FragmentDistribution, *};
108use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
109use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
110use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
111use risingwave_pb::secret::PbSecretRef;
112use risingwave_pb::stream_plan::StreamFragmentGraph;
113use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
114use risingwave_pb::user::update_user_request::UpdateField;
115use risingwave_pb::user::user_service_client::UserServiceClient;
116use risingwave_pb::user::*;
117use thiserror_ext::AsReport;
118use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
119use tokio::sync::oneshot::Sender;
120use tokio::sync::{RwLock, mpsc, oneshot};
121use tokio::task::JoinHandle;
122use tokio::time::{self};
123use tokio_retry::strategy::{ExponentialBackoff, jitter};
124use tokio_stream::wrappers::UnboundedReceiverStream;
125use tonic::transport::Endpoint;
126use tonic::{Code, Request, Streaming};
127
128use crate::channel::{Channel, WrappedChannelExt};
129use crate::error::{Result, RpcError};
130use crate::hummock_meta_client::{
131 CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
132 IcebergCompactionEventItem,
133};
134use crate::meta_rpc_client_method_impl;
135
136#[derive(Clone, Debug)]
138pub struct MetaClient {
139 worker_id: WorkerId,
140 worker_type: WorkerType,
141 host_addr: HostAddr,
142 inner: GrpcMetaClient,
143 meta_config: Arc<MetaConfig>,
144 cluster_id: String,
145 shutting_down: Arc<AtomicBool>,
146}
147
148impl MetaClient {
149 pub fn worker_id(&self) -> WorkerId {
150 self.worker_id
151 }
152
153 pub fn host_addr(&self) -> &HostAddr {
154 &self.host_addr
155 }
156
157 pub fn worker_type(&self) -> WorkerType {
158 self.worker_type
159 }
160
161 pub fn cluster_id(&self) -> &str {
162 &self.cluster_id
163 }
164
165 pub async fn subscribe(
167 &self,
168 subscribe_type: SubscribeType,
169 ) -> Result<Streaming<SubscribeResponse>> {
170 let request = SubscribeRequest {
171 subscribe_type: subscribe_type as i32,
172 host: Some(self.host_addr.to_protobuf()),
173 worker_id: self.worker_id(),
174 };
175
176 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
177 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
178 true,
179 );
180
181 tokio_retry::Retry::spawn(retry_strategy, || async {
182 let request = request.clone();
183 self.inner.subscribe(request).await
184 })
185 .await
186 }
187
188 pub async fn create_connection(
189 &self,
190 connection_name: String,
191 database_id: DatabaseId,
192 schema_id: SchemaId,
193 owner_id: UserId,
194 req: create_connection_request::Payload,
195 ) -> Result<WaitVersion> {
196 let request = CreateConnectionRequest {
197 name: connection_name,
198 database_id,
199 schema_id,
200 owner_id,
201 payload: Some(req),
202 };
203 let resp = self.inner.create_connection(request).await?;
204 Ok(resp
205 .version
206 .ok_or_else(|| anyhow!("wait version not set"))?)
207 }
208
209 pub async fn create_secret(
210 &self,
211 secret_name: String,
212 database_id: DatabaseId,
213 schema_id: SchemaId,
214 owner_id: UserId,
215 value: Vec<u8>,
216 ) -> Result<WaitVersion> {
217 let request = CreateSecretRequest {
218 name: secret_name,
219 database_id,
220 schema_id,
221 owner_id,
222 value,
223 };
224 let resp = self.inner.create_secret(request).await?;
225 Ok(resp
226 .version
227 .ok_or_else(|| anyhow!("wait version not set"))?)
228 }
229
230 pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
231 let request = ListConnectionsRequest {};
232 let resp = self.inner.list_connections(request).await?;
233 Ok(resp.connections)
234 }
235
236 pub async fn drop_connection(
237 &self,
238 connection_id: ConnectionId,
239 cascade: bool,
240 ) -> Result<WaitVersion> {
241 let request = DropConnectionRequest {
242 connection_id,
243 cascade,
244 };
245 let resp = self.inner.drop_connection(request).await?;
246 Ok(resp
247 .version
248 .ok_or_else(|| anyhow!("wait version not set"))?)
249 }
250
251 pub async fn drop_secret(&self, secret_id: SecretId, cascade: bool) -> Result<WaitVersion> {
252 let request = DropSecretRequest { secret_id, cascade };
253 let resp = self.inner.drop_secret(request).await?;
254 Ok(resp
255 .version
256 .ok_or_else(|| anyhow!("wait version not set"))?)
257 }
258
259 pub async fn register_new(
263 addr_strategy: MetaAddressStrategy,
264 worker_type: WorkerType,
265 addr: &HostAddr,
266 property: Property,
267 meta_config: Arc<MetaConfig>,
268 ) -> (Self, SystemParamsReader) {
269 let ret =
270 Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
271
272 match ret {
273 Ok(ret) => ret,
274 Err(err) => {
275 tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
276 std::process::exit(1);
277 }
278 }
279 }
280
281 async fn register_new_inner(
282 addr_strategy: MetaAddressStrategy,
283 worker_type: WorkerType,
284 addr: &HostAddr,
285 property: Property,
286 meta_config: Arc<MetaConfig>,
287 ) -> Result<(Self, SystemParamsReader)> {
288 tracing::info!("register meta client using strategy: {}", addr_strategy);
289
290 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
292 Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
293 true,
294 );
295
296 let init_result: Result<_> = tokio_retry::RetryIf::spawn(
297 retry_strategy,
298 || async {
299 let grpc_meta_client =
300 GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
301
302 let add_worker_resp = grpc_meta_client
303 .add_worker_node(AddWorkerNodeRequest {
304 worker_type: worker_type as i32,
305 host: Some(addr.to_protobuf()),
306 property: Some(property.clone()),
307 resource: Some(risingwave_pb::common::worker_node::Resource {
308 rw_version: RW_VERSION.to_owned(),
309 total_memory_bytes: system_memory_available_bytes() as _,
310 total_cpu_cores: total_cpu_available() as _,
311 hostname: hostname(),
312 }),
313 })
314 .await
315 .context("failed to add worker node")?;
316
317 let system_params_resp = grpc_meta_client
318 .get_system_params(GetSystemParamsRequest {})
319 .await
320 .context("failed to get initial system params")?;
321
322 Ok((add_worker_resp, system_params_resp, grpc_meta_client))
323 },
324 |e: &RpcError| !e.is_from_tonic_server_impl(),
327 )
328 .await;
329
330 let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
331 let worker_id = add_worker_resp
332 .node_id
333 .expect("AddWorkerNodeResponse::node_id is empty");
334
335 let meta_client = Self {
336 worker_id,
337 worker_type,
338 host_addr: addr.clone(),
339 inner: grpc_meta_client,
340 meta_config: meta_config.clone(),
341 cluster_id: add_worker_resp.cluster_id,
342 shutting_down: Arc::new(false.into()),
343 };
344
345 static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
346 REPORT_PANIC.call_once(|| {
347 let meta_client_clone = meta_client.clone();
348 std::panic::update_hook(move |default_hook, info| {
349 meta_client_clone.try_add_panic_event_blocking(info, None);
351 default_hook(info);
352 });
353 });
354
355 Ok((meta_client, system_params_resp.params.unwrap().into()))
356 }
357
358 pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
360 let request = ActivateWorkerNodeRequest {
361 host: Some(addr.to_protobuf()),
362 node_id: self.worker_id,
363 };
364 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
365 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
366 true,
367 );
368 tokio_retry::Retry::spawn(retry_strategy, || async {
369 let request = request.clone();
370 self.inner.activate_worker_node(request).await
371 })
372 .await?;
373
374 Ok(())
375 }
376
377 pub async fn send_heartbeat(&self) -> Result<()> {
379 let request = HeartbeatRequest {
380 node_id: self.worker_id,
381 resource: Some(risingwave_pb::common::worker_node::Resource {
382 rw_version: RW_VERSION.to_owned(),
383 total_memory_bytes: system_memory_available_bytes() as _,
384 total_cpu_cores: total_cpu_available() as _,
385 hostname: hostname(),
386 }),
387 };
388 let resp = self.inner.heartbeat(request).await?;
389 if let Some(status) = resp.status
390 && status.code() == risingwave_pb::common::status::Code::UnknownWorker
391 {
392 if !self.shutting_down.load(Relaxed) {
395 tracing::error!(message = status.message, "worker expired");
396 std::process::exit(1);
397 }
398 }
399 Ok(())
400 }
401
402 pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
403 let request = CreateDatabaseRequest { db: Some(db) };
404 let resp = self.inner.create_database(request).await?;
405 Ok(resp
407 .version
408 .ok_or_else(|| anyhow!("wait version not set"))?)
409 }
410
411 pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
412 let request = CreateSchemaRequest {
413 schema: Some(schema),
414 };
415 let resp = self.inner.create_schema(request).await?;
416 Ok(resp
418 .version
419 .ok_or_else(|| anyhow!("wait version not set"))?)
420 }
421
422 pub async fn create_materialized_view(
423 &self,
424 table: PbTable,
425 graph: StreamFragmentGraph,
426 dependencies: HashSet<ObjectId>,
427 resource_type: streaming_job_resource_type::ResourceType,
428 if_not_exists: bool,
429 ) -> Result<WaitVersion> {
430 let request = CreateMaterializedViewRequest {
431 materialized_view: Some(table),
432 fragment_graph: Some(graph),
433 resource_type: Some(PbStreamingJobResourceType {
434 resource_type: Some(resource_type),
435 }),
436 dependencies: dependencies.into_iter().collect(),
437 if_not_exists,
438 };
439 let resp = self.inner.create_materialized_view(request).await?;
440 Ok(resp
442 .version
443 .ok_or_else(|| anyhow!("wait version not set"))?)
444 }
445
446 pub async fn drop_materialized_view(
447 &self,
448 table_id: TableId,
449 cascade: bool,
450 ) -> Result<WaitVersion> {
451 let request = DropMaterializedViewRequest { table_id, cascade };
452
453 let resp = self.inner.drop_materialized_view(request).await?;
454 Ok(resp
455 .version
456 .ok_or_else(|| anyhow!("wait version not set"))?)
457 }
458
459 pub async fn create_source(
460 &self,
461 source: PbSource,
462 graph: Option<StreamFragmentGraph>,
463 if_not_exists: bool,
464 ) -> Result<WaitVersion> {
465 let request = CreateSourceRequest {
466 source: Some(source),
467 fragment_graph: graph,
468 if_not_exists,
469 };
470
471 let resp = self.inner.create_source(request).await?;
472 Ok(resp
473 .version
474 .ok_or_else(|| anyhow!("wait version not set"))?)
475 }
476
477 pub async fn create_sink(
478 &self,
479 sink: PbSink,
480 graph: StreamFragmentGraph,
481 dependencies: HashSet<ObjectId>,
482 if_not_exists: bool,
483 ) -> Result<WaitVersion> {
484 let request = CreateSinkRequest {
485 sink: Some(sink),
486 fragment_graph: Some(graph),
487 dependencies: dependencies.into_iter().collect(),
488 if_not_exists,
489 };
490
491 let resp = self.inner.create_sink(request).await?;
492 Ok(resp
493 .version
494 .ok_or_else(|| anyhow!("wait version not set"))?)
495 }
496
497 pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
498 let request = CreateSubscriptionRequest {
499 subscription: Some(subscription),
500 };
501
502 let resp = self.inner.create_subscription(request).await?;
503 Ok(resp
504 .version
505 .ok_or_else(|| anyhow!("wait version not set"))?)
506 }
507
508 pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
509 let request = CreateFunctionRequest {
510 function: Some(function),
511 };
512 let resp = self.inner.create_function(request).await?;
513 Ok(resp
514 .version
515 .ok_or_else(|| anyhow!("wait version not set"))?)
516 }
517
518 pub async fn create_table(
519 &self,
520 source: Option<PbSource>,
521 table: PbTable,
522 graph: StreamFragmentGraph,
523 job_type: PbTableJobType,
524 if_not_exists: bool,
525 dependencies: HashSet<ObjectId>,
526 ) -> Result<WaitVersion> {
527 let request = CreateTableRequest {
528 materialized_view: Some(table),
529 fragment_graph: Some(graph),
530 source,
531 job_type: job_type as _,
532 if_not_exists,
533 dependencies: dependencies.into_iter().collect(),
534 };
535 let resp = self.inner.create_table(request).await?;
536 Ok(resp
538 .version
539 .ok_or_else(|| anyhow!("wait version not set"))?)
540 }
541
542 pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
543 let request = CommentOnRequest {
544 comment: Some(comment),
545 };
546 let resp = self.inner.comment_on(request).await?;
547 Ok(resp
548 .version
549 .ok_or_else(|| anyhow!("wait version not set"))?)
550 }
551
552 pub async fn alter_name(
553 &self,
554 object: alter_name_request::Object,
555 name: &str,
556 ) -> Result<WaitVersion> {
557 let request = AlterNameRequest {
558 object: Some(object),
559 new_name: name.to_owned(),
560 };
561 let resp = self.inner.alter_name(request).await?;
562 Ok(resp
563 .version
564 .ok_or_else(|| anyhow!("wait version not set"))?)
565 }
566
567 pub async fn alter_owner(&self, object: Object, owner_id: UserId) -> Result<WaitVersion> {
568 let request = AlterOwnerRequest {
569 object: Some(object),
570 owner_id,
571 };
572 let resp = self.inner.alter_owner(request).await?;
573 Ok(resp
574 .version
575 .ok_or_else(|| anyhow!("wait version not set"))?)
576 }
577
578 pub async fn alter_subscription_retention(
579 &self,
580 subscription_id: SubscriptionId,
581 retention_seconds: u64,
582 definition: String,
583 ) -> Result<WaitVersion> {
584 let request = AlterSubscriptionRetentionRequest {
585 subscription_id,
586 retention_seconds,
587 definition,
588 };
589 let resp = self.inner.alter_subscription_retention(request).await?;
590 Ok(resp
591 .version
592 .ok_or_else(|| anyhow!("wait version not set"))?)
593 }
594
595 pub async fn alter_database_param(
596 &self,
597 database_id: DatabaseId,
598 param: AlterDatabaseParam,
599 ) -> Result<WaitVersion> {
600 let request = match param {
601 AlterDatabaseParam::BarrierIntervalMs(barrier_interval_ms) => {
602 let barrier_interval_ms = OptionalUint32 {
603 value: barrier_interval_ms,
604 };
605 AlterDatabaseParamRequest {
606 database_id,
607 param: Some(alter_database_param_request::Param::BarrierIntervalMs(
608 barrier_interval_ms,
609 )),
610 }
611 }
612 AlterDatabaseParam::CheckpointFrequency(checkpoint_frequency) => {
613 let checkpoint_frequency = OptionalUint64 {
614 value: checkpoint_frequency,
615 };
616 AlterDatabaseParamRequest {
617 database_id,
618 param: Some(alter_database_param_request::Param::CheckpointFrequency(
619 checkpoint_frequency,
620 )),
621 }
622 }
623 };
624 let resp = self.inner.alter_database_param(request).await?;
625 Ok(resp
626 .version
627 .ok_or_else(|| anyhow!("wait version not set"))?)
628 }
629
630 pub async fn alter_set_schema(
631 &self,
632 object: alter_set_schema_request::Object,
633 new_schema_id: SchemaId,
634 ) -> Result<WaitVersion> {
635 let request = AlterSetSchemaRequest {
636 new_schema_id,
637 object: Some(object),
638 };
639 let resp = self.inner.alter_set_schema(request).await?;
640 Ok(resp
641 .version
642 .ok_or_else(|| anyhow!("wait version not set"))?)
643 }
644
645 pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
646 let request = AlterSourceRequest {
647 source: Some(source),
648 };
649 let resp = self.inner.alter_source(request).await?;
650 Ok(resp
651 .version
652 .ok_or_else(|| anyhow!("wait version not set"))?)
653 }
654
655 pub async fn alter_parallelism(
656 &self,
657 job_id: JobId,
658 parallelism: PbTableParallelism,
659 deferred: bool,
660 ) -> Result<()> {
661 let request = AlterParallelismRequest {
662 table_id: job_id,
663 parallelism: Some(parallelism),
664 deferred,
665 };
666
667 self.inner.alter_parallelism(request).await?;
668 Ok(())
669 }
670
671 pub async fn alter_backfill_parallelism(
672 &self,
673 job_id: JobId,
674 parallelism: Option<PbTableParallelism>,
675 deferred: bool,
676 ) -> Result<()> {
677 let request = AlterBackfillParallelismRequest {
678 table_id: job_id,
679 parallelism,
680 deferred,
681 };
682
683 self.inner.alter_backfill_parallelism(request).await?;
684 Ok(())
685 }
686
687 pub async fn alter_streaming_job_config(
688 &self,
689 job_id: JobId,
690 entries_to_add: HashMap<String, String>,
691 keys_to_remove: Vec<String>,
692 ) -> Result<()> {
693 let request = AlterStreamingJobConfigRequest {
694 job_id,
695 entries_to_add,
696 keys_to_remove,
697 };
698
699 self.inner.alter_streaming_job_config(request).await?;
700 Ok(())
701 }
702
703 pub async fn alter_fragment_parallelism(
704 &self,
705 fragment_ids: Vec<FragmentId>,
706 parallelism: Option<PbTableParallelism>,
707 ) -> Result<()> {
708 let request = AlterFragmentParallelismRequest {
709 fragment_ids,
710 parallelism,
711 };
712
713 self.inner.alter_fragment_parallelism(request).await?;
714 Ok(())
715 }
716
717 pub async fn alter_cdc_table_backfill_parallelism(
718 &self,
719 table_id: JobId,
720 parallelism: PbTableParallelism,
721 ) -> Result<()> {
722 let request = AlterCdcTableBackfillParallelismRequest {
723 table_id,
724 parallelism: Some(parallelism),
725 };
726 self.inner
727 .alter_cdc_table_backfill_parallelism(request)
728 .await?;
729 Ok(())
730 }
731
732 pub async fn alter_resource_group(
733 &self,
734 table_id: TableId,
735 resource_group: Option<String>,
736 deferred: bool,
737 ) -> Result<()> {
738 let request = AlterResourceGroupRequest {
739 table_id,
740 resource_group,
741 deferred,
742 };
743
744 self.inner.alter_resource_group(request).await?;
745 Ok(())
746 }
747
748 pub async fn alter_swap_rename(
749 &self,
750 object: alter_swap_rename_request::Object,
751 ) -> Result<WaitVersion> {
752 let request = AlterSwapRenameRequest {
753 object: Some(object),
754 };
755 let resp = self.inner.alter_swap_rename(request).await?;
756 Ok(resp
757 .version
758 .ok_or_else(|| anyhow!("wait version not set"))?)
759 }
760
761 pub async fn alter_secret(
762 &self,
763 secret_id: SecretId,
764 secret_name: String,
765 database_id: DatabaseId,
766 schema_id: SchemaId,
767 owner_id: UserId,
768 value: Vec<u8>,
769 ) -> Result<WaitVersion> {
770 let request = AlterSecretRequest {
771 secret_id,
772 name: secret_name,
773 database_id,
774 schema_id,
775 owner_id,
776 value,
777 };
778 let resp = self.inner.alter_secret(request).await?;
779 Ok(resp
780 .version
781 .ok_or_else(|| anyhow!("wait version not set"))?)
782 }
783
784 pub async fn replace_job(
785 &self,
786 graph: StreamFragmentGraph,
787 replace_job: ReplaceJob,
788 ) -> Result<WaitVersion> {
789 let request = ReplaceJobPlanRequest {
790 plan: Some(ReplaceJobPlan {
791 fragment_graph: Some(graph),
792 replace_job: Some(replace_job),
793 }),
794 };
795 let resp = self.inner.replace_job_plan(request).await?;
796 Ok(resp
798 .version
799 .ok_or_else(|| anyhow!("wait version not set"))?)
800 }
801
802 pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
803 let request = AutoSchemaChangeRequest {
804 schema_change: Some(schema_change),
805 };
806 let _ = self.inner.auto_schema_change(request).await?;
807 Ok(())
808 }
809
810 pub async fn create_view(
811 &self,
812 view: PbView,
813 dependencies: HashSet<ObjectId>,
814 ) -> Result<WaitVersion> {
815 let request = CreateViewRequest {
816 view: Some(view),
817 dependencies: dependencies.into_iter().collect(),
818 };
819 let resp = self.inner.create_view(request).await?;
820 Ok(resp
822 .version
823 .ok_or_else(|| anyhow!("wait version not set"))?)
824 }
825
826 pub async fn create_index(
827 &self,
828 index: PbIndex,
829 table: PbTable,
830 graph: StreamFragmentGraph,
831 if_not_exists: bool,
832 ) -> Result<WaitVersion> {
833 let request = CreateIndexRequest {
834 index: Some(index),
835 index_table: Some(table),
836 fragment_graph: Some(graph),
837 if_not_exists,
838 };
839 let resp = self.inner.create_index(request).await?;
840 Ok(resp
842 .version
843 .ok_or_else(|| anyhow!("wait version not set"))?)
844 }
845
846 pub async fn drop_table(
847 &self,
848 source_id: Option<SourceId>,
849 table_id: TableId,
850 cascade: bool,
851 ) -> Result<WaitVersion> {
852 let request = DropTableRequest {
853 source_id: source_id.map(risingwave_pb::ddl_service::drop_table_request::SourceId::Id),
854 table_id,
855 cascade,
856 };
857
858 let resp = self.inner.drop_table(request).await?;
859 Ok(resp
860 .version
861 .ok_or_else(|| anyhow!("wait version not set"))?)
862 }
863
864 pub async fn compact_iceberg_table(&self, sink_id: SinkId) -> Result<u64> {
865 let request = CompactIcebergTableRequest { sink_id };
866 let resp = self.inner.compact_iceberg_table(request).await?;
867 Ok(resp.task_id)
868 }
869
870 pub async fn expire_iceberg_table_snapshots(&self, sink_id: SinkId) -> Result<()> {
871 let request = ExpireIcebergTableSnapshotsRequest { sink_id };
872 let _resp = self.inner.expire_iceberg_table_snapshots(request).await?;
873 Ok(())
874 }
875
876 pub async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<WaitVersion> {
877 let request = DropViewRequest { view_id, cascade };
878 let resp = self.inner.drop_view(request).await?;
879 Ok(resp
880 .version
881 .ok_or_else(|| anyhow!("wait version not set"))?)
882 }
883
884 pub async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<WaitVersion> {
885 let request = DropSourceRequest { source_id, cascade };
886 let resp = self.inner.drop_source(request).await?;
887 Ok(resp
888 .version
889 .ok_or_else(|| anyhow!("wait version not set"))?)
890 }
891
892 pub async fn reset_source(&self, source_id: SourceId) -> Result<WaitVersion> {
893 let request = ResetSourceRequest { source_id };
894 let resp = self.inner.reset_source(request).await?;
895 Ok(resp
896 .version
897 .ok_or_else(|| anyhow!("wait version not set"))?)
898 }
899
900 pub async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<WaitVersion> {
901 let request = DropSinkRequest { sink_id, cascade };
902 let resp = self.inner.drop_sink(request).await?;
903 Ok(resp
904 .version
905 .ok_or_else(|| anyhow!("wait version not set"))?)
906 }
907
908 pub async fn drop_subscription(
909 &self,
910 subscription_id: SubscriptionId,
911 cascade: bool,
912 ) -> Result<WaitVersion> {
913 let request = DropSubscriptionRequest {
914 subscription_id,
915 cascade,
916 };
917 let resp = self.inner.drop_subscription(request).await?;
918 Ok(resp
919 .version
920 .ok_or_else(|| anyhow!("wait version not set"))?)
921 }
922
923 pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
924 let request = DropIndexRequest { index_id, cascade };
925 let resp = self.inner.drop_index(request).await?;
926 Ok(resp
927 .version
928 .ok_or_else(|| anyhow!("wait version not set"))?)
929 }
930
931 pub async fn drop_function(
932 &self,
933 function_id: FunctionId,
934 cascade: bool,
935 ) -> Result<WaitVersion> {
936 let request = DropFunctionRequest {
937 function_id,
938 cascade,
939 };
940 let resp = self.inner.drop_function(request).await?;
941 Ok(resp
942 .version
943 .ok_or_else(|| anyhow!("wait version not set"))?)
944 }
945
946 pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
947 let request = DropDatabaseRequest { database_id };
948 let resp = self.inner.drop_database(request).await?;
949 Ok(resp
950 .version
951 .ok_or_else(|| anyhow!("wait version not set"))?)
952 }
953
954 pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
955 let request = DropSchemaRequest { schema_id, cascade };
956 let resp = self.inner.drop_schema(request).await?;
957 Ok(resp
958 .version
959 .ok_or_else(|| anyhow!("wait version not set"))?)
960 }
961
962 pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
964 let request = CreateUserRequest { user: Some(user) };
965 let resp = self.inner.create_user(request).await?;
966 Ok(resp.version)
967 }
968
969 pub async fn drop_user(&self, user_id: UserId) -> Result<u64> {
970 let request = DropUserRequest { user_id };
971 let resp = self.inner.drop_user(request).await?;
972 Ok(resp.version)
973 }
974
975 pub async fn update_user(
976 &self,
977 user: UserInfo,
978 update_fields: Vec<UpdateField>,
979 ) -> Result<u64> {
980 let request = UpdateUserRequest {
981 user: Some(user),
982 update_fields: update_fields
983 .into_iter()
984 .map(|field| field as i32)
985 .collect::<Vec<_>>(),
986 };
987 let resp = self.inner.update_user(request).await?;
988 Ok(resp.version)
989 }
990
991 pub async fn grant_privilege(
992 &self,
993 user_ids: Vec<UserId>,
994 privileges: Vec<GrantPrivilege>,
995 with_grant_option: bool,
996 granted_by: UserId,
997 ) -> Result<u64> {
998 let request = GrantPrivilegeRequest {
999 user_ids,
1000 privileges,
1001 with_grant_option,
1002 granted_by,
1003 };
1004 let resp = self.inner.grant_privilege(request).await?;
1005 Ok(resp.version)
1006 }
1007
1008 pub async fn revoke_privilege(
1009 &self,
1010 user_ids: Vec<UserId>,
1011 privileges: Vec<GrantPrivilege>,
1012 granted_by: UserId,
1013 revoke_by: UserId,
1014 revoke_grant_option: bool,
1015 cascade: bool,
1016 ) -> Result<u64> {
1017 let request = RevokePrivilegeRequest {
1018 user_ids,
1019 privileges,
1020 granted_by,
1021 revoke_by,
1022 revoke_grant_option,
1023 cascade,
1024 };
1025 let resp = self.inner.revoke_privilege(request).await?;
1026 Ok(resp.version)
1027 }
1028
1029 pub async fn alter_default_privilege(
1030 &self,
1031 user_ids: Vec<UserId>,
1032 database_id: DatabaseId,
1033 schema_ids: Vec<SchemaId>,
1034 operation: AlterDefaultPrivilegeOperation,
1035 granted_by: UserId,
1036 ) -> Result<()> {
1037 let request = AlterDefaultPrivilegeRequest {
1038 user_ids,
1039 database_id,
1040 schema_ids,
1041 operation: Some(operation),
1042 granted_by,
1043 };
1044 self.inner.alter_default_privilege(request).await?;
1045 Ok(())
1046 }
1047
1048 pub async fn unregister(&self) -> Result<()> {
1050 let request = DeleteWorkerNodeRequest {
1051 host: Some(self.host_addr.to_protobuf()),
1052 };
1053 self.inner.delete_worker_node(request).await?;
1054 self.shutting_down.store(true, Relaxed);
1055 Ok(())
1056 }
1057
1058 pub async fn try_unregister(&self) {
1060 match self.unregister().await {
1061 Ok(_) => {
1062 tracing::info!(
1063 worker_id = %self.worker_id(),
1064 "successfully unregistered from meta service",
1065 )
1066 }
1067 Err(e) => {
1068 tracing::warn!(
1069 error = %e.as_report(),
1070 worker_id = %self.worker_id(),
1071 "failed to unregister from meta service",
1072 );
1073 }
1074 }
1075 }
1076
1077 pub async fn list_worker_nodes(
1078 &self,
1079 worker_type: Option<WorkerType>,
1080 ) -> Result<Vec<WorkerNode>> {
1081 let request = ListAllNodesRequest {
1082 worker_type: worker_type.map(Into::into),
1083 include_starting_nodes: true,
1084 };
1085 let resp = self.inner.list_all_nodes(request).await?;
1086 Ok(resp.nodes)
1087 }
1088
1089 pub fn start_heartbeat_loop(
1091 meta_client: MetaClient,
1092 min_interval: Duration,
1093 ) -> (JoinHandle<()>, Sender<()>) {
1094 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1095 let join_handle = tokio::spawn(async move {
1096 let mut min_interval_ticker = tokio::time::interval(min_interval);
1097 loop {
1098 tokio::select! {
1099 biased;
1100 _ = &mut shutdown_rx => {
1102 tracing::info!("Heartbeat loop is stopped");
1103 return;
1104 }
1105 _ = min_interval_ticker.tick() => {},
1107 }
1108 tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
1109 match tokio::time::timeout(
1110 min_interval * 3,
1112 meta_client.send_heartbeat(),
1113 )
1114 .await
1115 {
1116 Ok(Ok(_)) => {}
1117 Ok(Err(err)) => {
1118 tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
1119 }
1120 Err(_) => {
1121 tracing::warn!("Failed to send_heartbeat: timeout");
1122 }
1123 }
1124 }
1125 });
1126 (join_handle, shutdown_tx)
1127 }
1128
1129 pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
1130 let request = RisectlListStateTablesRequest {};
1131 let resp = self.inner.risectl_list_state_tables(request).await?;
1132 Ok(resp.tables)
1133 }
1134
1135 pub async fn risectl_resume_backfill(
1136 &self,
1137 request: RisectlResumeBackfillRequest,
1138 ) -> Result<()> {
1139 self.inner.risectl_resume_backfill(request).await?;
1140 Ok(())
1141 }
1142
1143 pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
1144 let request = FlushRequest { database_id };
1145 let resp = self.inner.flush(request).await?;
1146 Ok(resp.hummock_version_id)
1147 }
1148
1149 pub async fn wait(&self) -> Result<WaitVersion> {
1150 let request = WaitRequest {};
1151 let resp = self.inner.wait(request).await?;
1152 Ok(resp
1153 .version
1154 .ok_or_else(|| anyhow!("wait version not set"))?)
1155 }
1156
1157 pub async fn recover(&self) -> Result<()> {
1158 let request = RecoverRequest {};
1159 self.inner.recover(request).await?;
1160 Ok(())
1161 }
1162
1163 pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
1164 let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
1165 let resp = self.inner.cancel_creating_jobs(request).await?;
1166 Ok(resp.canceled_jobs)
1167 }
1168
1169 pub async fn list_table_fragments(
1170 &self,
1171 job_ids: &[JobId],
1172 ) -> Result<HashMap<JobId, TableFragmentInfo>> {
1173 let request = ListTableFragmentsRequest {
1174 table_ids: job_ids.to_vec(),
1175 };
1176 let resp = self.inner.list_table_fragments(request).await?;
1177 Ok(resp.table_fragments)
1178 }
1179
1180 pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1181 let resp = self
1182 .inner
1183 .list_streaming_job_states(ListStreamingJobStatesRequest {})
1184 .await?;
1185 Ok(resp.states)
1186 }
1187
1188 pub async fn list_fragment_distributions(
1189 &self,
1190 include_node: bool,
1191 ) -> Result<Vec<FragmentDistribution>> {
1192 let resp = self
1193 .inner
1194 .list_fragment_distribution(ListFragmentDistributionRequest {
1195 include_node: Some(include_node),
1196 })
1197 .await?;
1198 Ok(resp.distributions)
1199 }
1200
1201 pub async fn list_creating_fragment_distribution(&self) -> Result<Vec<FragmentDistribution>> {
1202 let resp = self
1203 .inner
1204 .list_creating_fragment_distribution(ListCreatingFragmentDistributionRequest {
1205 include_node: Some(true),
1206 })
1207 .await?;
1208 Ok(resp.distributions)
1209 }
1210
1211 pub async fn get_fragment_by_id(
1212 &self,
1213 fragment_id: FragmentId,
1214 ) -> Result<Option<FragmentDistribution>> {
1215 let resp = self
1216 .inner
1217 .get_fragment_by_id(GetFragmentByIdRequest { fragment_id })
1218 .await?;
1219 Ok(resp.distribution)
1220 }
1221
1222 pub async fn get_fragment_vnodes(
1223 &self,
1224 fragment_id: FragmentId,
1225 ) -> Result<Vec<(ActorId, Vec<u32>)>> {
1226 let resp = self
1227 .inner
1228 .get_fragment_vnodes(GetFragmentVnodesRequest { fragment_id })
1229 .await?;
1230 Ok(resp
1231 .actor_vnodes
1232 .into_iter()
1233 .map(|actor| (actor.actor_id, actor.vnode_indices))
1234 .collect())
1235 }
1236
1237 pub async fn get_actor_vnodes(&self, actor_id: ActorId) -> Result<Vec<u32>> {
1238 let resp = self
1239 .inner
1240 .get_actor_vnodes(GetActorVnodesRequest { actor_id })
1241 .await?;
1242 Ok(resp.vnode_indices)
1243 }
1244
1245 pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1246 let resp = self
1247 .inner
1248 .list_actor_states(ListActorStatesRequest {})
1249 .await?;
1250 Ok(resp.states)
1251 }
1252
1253 pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1254 let resp = self
1255 .inner
1256 .list_actor_splits(ListActorSplitsRequest {})
1257 .await?;
1258
1259 Ok(resp.actor_splits)
1260 }
1261
1262 pub async fn pause(&self) -> Result<PauseResponse> {
1263 let request = PauseRequest {};
1264 let resp = self.inner.pause(request).await?;
1265 Ok(resp)
1266 }
1267
1268 pub async fn resume(&self) -> Result<ResumeResponse> {
1269 let request = ResumeRequest {};
1270 let resp = self.inner.resume(request).await?;
1271 Ok(resp)
1272 }
1273
1274 pub async fn apply_throttle(
1275 &self,
1276 throttle_target: PbThrottleTarget,
1277 throttle_type: risingwave_pb::common::PbThrottleType,
1278 id: u32,
1279 rate: Option<u32>,
1280 ) -> Result<ApplyThrottleResponse> {
1281 let request = ApplyThrottleRequest {
1282 throttle_target: throttle_target as i32,
1283 throttle_type: throttle_type as i32,
1284 id,
1285 rate,
1286 };
1287 let resp = self.inner.apply_throttle(request).await?;
1288 Ok(resp)
1289 }
1290
1291 pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1292 let resp = self
1293 .inner
1294 .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1295 .await?;
1296 Ok(resp.get_status().unwrap())
1297 }
1298
1299 pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1300 let request = GetClusterInfoRequest {};
1301 let resp = self.inner.get_cluster_info(request).await?;
1302 Ok(resp)
1303 }
1304
1305 pub async fn reschedule(
1306 &self,
1307 worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1308 revision: u64,
1309 resolve_no_shuffle_upstream: bool,
1310 ) -> Result<(bool, u64)> {
1311 let request = RescheduleRequest {
1312 revision,
1313 resolve_no_shuffle_upstream,
1314 worker_reschedules,
1315 };
1316 let resp = self.inner.reschedule(request).await?;
1317 Ok((resp.success, resp.revision))
1318 }
1319
1320 pub async fn risectl_get_pinned_versions_summary(
1321 &self,
1322 ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1323 let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1324 self.inner
1325 .rise_ctl_get_pinned_versions_summary(request)
1326 .await
1327 }
1328
1329 pub async fn risectl_get_checkpoint_hummock_version(
1330 &self,
1331 ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1332 let request = RiseCtlGetCheckpointVersionRequest {};
1333 self.inner.rise_ctl_get_checkpoint_version(request).await
1334 }
1335
1336 pub async fn risectl_pause_hummock_version_checkpoint(
1337 &self,
1338 ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1339 let request = RiseCtlPauseVersionCheckpointRequest {};
1340 self.inner.rise_ctl_pause_version_checkpoint(request).await
1341 }
1342
1343 pub async fn risectl_resume_hummock_version_checkpoint(
1344 &self,
1345 ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1346 let request = RiseCtlResumeVersionCheckpointRequest {};
1347 self.inner.rise_ctl_resume_version_checkpoint(request).await
1348 }
1349
1350 pub async fn init_metadata_for_replay(
1351 &self,
1352 tables: Vec<PbTable>,
1353 compaction_groups: Vec<CompactionGroupInfo>,
1354 ) -> Result<()> {
1355 let req = InitMetadataForReplayRequest {
1356 tables,
1357 compaction_groups,
1358 };
1359 let _resp = self.inner.init_metadata_for_replay(req).await?;
1360 Ok(())
1361 }
1362
1363 pub async fn replay_version_delta(
1364 &self,
1365 version_delta: HummockVersionDelta,
1366 ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1367 let req = ReplayVersionDeltaRequest {
1368 version_delta: Some(version_delta.into()),
1369 };
1370 let resp = self.inner.replay_version_delta(req).await?;
1371 Ok((
1372 HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1373 resp.modified_compaction_groups,
1374 ))
1375 }
1376
1377 pub async fn list_version_deltas(
1378 &self,
1379 start_id: HummockVersionId,
1380 num_limit: u32,
1381 committed_epoch_limit: HummockEpoch,
1382 ) -> Result<Vec<HummockVersionDelta>> {
1383 let req = ListVersionDeltasRequest {
1384 start_id,
1385 num_limit,
1386 committed_epoch_limit,
1387 };
1388 Ok(self
1389 .inner
1390 .list_version_deltas(req)
1391 .await?
1392 .version_deltas
1393 .unwrap()
1394 .version_deltas
1395 .iter()
1396 .map(HummockVersionDelta::from_rpc_protobuf)
1397 .collect())
1398 }
1399
1400 pub async fn trigger_compaction_deterministic(
1401 &self,
1402 version_id: HummockVersionId,
1403 compaction_groups: Vec<CompactionGroupId>,
1404 ) -> Result<()> {
1405 let req = TriggerCompactionDeterministicRequest {
1406 version_id,
1407 compaction_groups,
1408 };
1409 self.inner.trigger_compaction_deterministic(req).await?;
1410 Ok(())
1411 }
1412
1413 pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1414 let req = DisableCommitEpochRequest {};
1415 Ok(HummockVersion::from_rpc_protobuf(
1416 &self
1417 .inner
1418 .disable_commit_epoch(req)
1419 .await?
1420 .current_version
1421 .unwrap(),
1422 ))
1423 }
1424
1425 pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1426 let req = GetAssignedCompactTaskNumRequest {};
1427 let resp = self.inner.get_assigned_compact_task_num(req).await?;
1428 Ok(resp.num_tasks as usize)
1429 }
1430
1431 pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1432 let req = RiseCtlListCompactionGroupRequest {};
1433 let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1434 Ok(resp.compaction_groups)
1435 }
1436
1437 pub async fn risectl_update_compaction_config(
1438 &self,
1439 compaction_groups: &[CompactionGroupId],
1440 configs: &[MutableConfig],
1441 ) -> Result<()> {
1442 let req = RiseCtlUpdateCompactionConfigRequest {
1443 compaction_group_ids: compaction_groups.to_vec(),
1444 configs: configs
1445 .iter()
1446 .map(
1447 |c| rise_ctl_update_compaction_config_request::MutableConfig {
1448 mutable_config: Some(c.clone()),
1449 },
1450 )
1451 .collect(),
1452 };
1453 let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1454 Ok(())
1455 }
1456
1457 pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1458 let req = BackupMetaRequest { remarks };
1459 let resp = self.inner.backup_meta(req).await?;
1460 Ok(resp.job_id)
1461 }
1462
1463 pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1464 let req = GetBackupJobStatusRequest { job_id };
1465 let resp = self.inner.get_backup_job_status(req).await?;
1466 Ok((resp.job_status(), resp.message))
1467 }
1468
1469 pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1470 let req = DeleteMetaSnapshotRequest {
1471 snapshot_ids: snapshot_ids.to_vec(),
1472 };
1473 let _resp = self.inner.delete_meta_snapshot(req).await?;
1474 Ok(())
1475 }
1476
1477 pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1478 let req = GetMetaSnapshotManifestRequest {};
1479 let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1480 Ok(resp.manifest.expect("should exist"))
1481 }
1482
1483 pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1484 let req = GetTelemetryInfoRequest {};
1485 let resp = self.inner.get_telemetry_info(req).await?;
1486 Ok(resp)
1487 }
1488
1489 pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1490 let req = GetMetaStoreInfoRequest {};
1491 let resp = self.inner.get_meta_store_info(req).await?;
1492 Ok(resp.meta_store_endpoint)
1493 }
1494
1495 pub async fn alter_sink_props(
1496 &self,
1497 sink_id: SinkId,
1498 changed_props: BTreeMap<String, String>,
1499 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1500 connector_conn_ref: Option<ConnectionId>,
1501 ) -> Result<()> {
1502 let req = AlterConnectorPropsRequest {
1503 object_id: sink_id.as_raw_id(),
1504 changed_props: changed_props.into_iter().collect(),
1505 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1506 connector_conn_ref,
1507 object_type: AlterConnectorPropsObject::Sink as i32,
1508 extra_options: None,
1509 };
1510 let _resp = self.inner.alter_connector_props(req).await?;
1511 Ok(())
1512 }
1513
1514 pub async fn alter_iceberg_table_props(
1515 &self,
1516 table_id: TableId,
1517 sink_id: SinkId,
1518 source_id: SourceId,
1519 changed_props: BTreeMap<String, String>,
1520 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1521 connector_conn_ref: Option<ConnectionId>,
1522 ) -> Result<()> {
1523 let req = AlterConnectorPropsRequest {
1524 object_id: table_id.as_raw_id(),
1525 changed_props: changed_props.into_iter().collect(),
1526 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1527 connector_conn_ref,
1528 object_type: AlterConnectorPropsObject::IcebergTable as i32,
1529 extra_options: Some(ExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds {
1530 sink_id,
1531 source_id,
1532 })),
1533 };
1534 let _resp = self.inner.alter_connector_props(req).await?;
1535 Ok(())
1536 }
1537
1538 pub async fn alter_source_connector_props(
1539 &self,
1540 source_id: SourceId,
1541 changed_props: BTreeMap<String, String>,
1542 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1543 connector_conn_ref: Option<ConnectionId>,
1544 ) -> Result<()> {
1545 let req = AlterConnectorPropsRequest {
1546 object_id: source_id.as_raw_id(),
1547 changed_props: changed_props.into_iter().collect(),
1548 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1549 connector_conn_ref,
1550 object_type: AlterConnectorPropsObject::Source as i32,
1551 extra_options: None,
1552 };
1553 let _resp = self.inner.alter_connector_props(req).await?;
1554 Ok(())
1555 }
1556
1557 pub async fn alter_connection_connector_props(
1558 &self,
1559 connection_id: u32,
1560 changed_props: BTreeMap<String, String>,
1561 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1562 ) -> Result<()> {
1563 let req = AlterConnectorPropsRequest {
1564 object_id: connection_id,
1565 changed_props: changed_props.into_iter().collect(),
1566 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1567 connector_conn_ref: None, object_type: AlterConnectorPropsObject::Connection as i32,
1569 extra_options: None,
1570 };
1571 let _resp = self.inner.alter_connector_props(req).await?;
1572 Ok(())
1573 }
1574
1575 pub async fn alter_source_properties_safe(
1578 &self,
1579 source_id: SourceId,
1580 changed_props: BTreeMap<String, String>,
1581 changed_secret_refs: BTreeMap<String, PbSecretRef>,
1582 reset_splits: bool,
1583 ) -> Result<()> {
1584 let req = AlterSourcePropertiesSafeRequest {
1585 source_id: source_id.as_raw_id(),
1586 changed_props: changed_props.into_iter().collect(),
1587 changed_secret_refs: changed_secret_refs.into_iter().collect(),
1588 options: Some(PropertyUpdateOptions { reset_splits }),
1589 };
1590 let _resp = self.inner.alter_source_properties_safe(req).await?;
1591 Ok(())
1592 }
1593
1594 pub async fn reset_source_splits(&self, source_id: SourceId) -> Result<()> {
1597 let req = ResetSourceSplitsRequest {
1598 source_id: source_id.as_raw_id(),
1599 };
1600 let _resp = self.inner.reset_source_splits(req).await?;
1601 Ok(())
1602 }
1603
1604 pub async fn inject_source_offsets(
1607 &self,
1608 source_id: SourceId,
1609 split_offsets: HashMap<String, String>,
1610 ) -> Result<Vec<String>> {
1611 let req = InjectSourceOffsetsRequest {
1612 source_id: source_id.as_raw_id(),
1613 split_offsets,
1614 };
1615 let resp = self.inner.inject_source_offsets(req).await?;
1616 Ok(resp.applied_split_ids)
1617 }
1618
1619 pub async fn set_system_param(
1620 &self,
1621 param: String,
1622 value: Option<String>,
1623 ) -> Result<Option<SystemParamsReader>> {
1624 let req = SetSystemParamRequest { param, value };
1625 let resp = self.inner.set_system_param(req).await?;
1626 Ok(resp.params.map(SystemParamsReader::from))
1627 }
1628
1629 pub async fn get_session_params(&self) -> Result<String> {
1630 let req = GetSessionParamsRequest {};
1631 let resp = self.inner.get_session_params(req).await?;
1632 Ok(resp.params)
1633 }
1634
1635 pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1636 let req = SetSessionParamRequest { param, value };
1637 let resp = self.inner.set_session_param(req).await?;
1638 Ok(resp.param)
1639 }
1640
1641 pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1642 let req = GetDdlProgressRequest {};
1643 let resp = self.inner.get_ddl_progress(req).await?;
1644 Ok(resp.ddl_progress)
1645 }
1646
1647 pub async fn split_compaction_group(
1648 &self,
1649 group_id: CompactionGroupId,
1650 table_ids_to_new_group: &[TableId],
1651 partition_vnode_count: u32,
1652 ) -> Result<CompactionGroupId> {
1653 let req = SplitCompactionGroupRequest {
1654 group_id,
1655 table_ids: table_ids_to_new_group.to_vec(),
1656 partition_vnode_count,
1657 };
1658 let resp = self.inner.split_compaction_group(req).await?;
1659 Ok(resp.new_group_id)
1660 }
1661
1662 pub async fn get_tables(
1663 &self,
1664 table_ids: Vec<TableId>,
1665 include_dropped_tables: bool,
1666 ) -> Result<HashMap<TableId, Table>> {
1667 let req = GetTablesRequest {
1668 table_ids,
1669 include_dropped_tables,
1670 };
1671 let resp = self.inner.get_tables(req).await?;
1672 Ok(resp.tables)
1673 }
1674
1675 pub async fn list_serving_vnode_mappings(
1676 &self,
1677 ) -> Result<HashMap<FragmentId, (JobId, WorkerSlotMapping)>> {
1678 let req = GetServingVnodeMappingsRequest {};
1679 let resp = self.inner.get_serving_vnode_mappings(req).await?;
1680 let mappings = resp
1681 .worker_slot_mappings
1682 .into_iter()
1683 .map(|p| {
1684 (
1685 p.fragment_id,
1686 (
1687 resp.fragment_to_table
1688 .get(&p.fragment_id)
1689 .cloned()
1690 .unwrap_or_default(),
1691 WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1692 ),
1693 )
1694 })
1695 .collect();
1696 Ok(mappings)
1697 }
1698
1699 pub async fn risectl_list_compaction_status(
1700 &self,
1701 ) -> Result<(
1702 Vec<CompactStatus>,
1703 Vec<CompactTaskAssignment>,
1704 Vec<CompactTaskProgress>,
1705 )> {
1706 let req = RiseCtlListCompactionStatusRequest {};
1707 let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1708 Ok((
1709 resp.compaction_statuses,
1710 resp.task_assignment,
1711 resp.task_progress,
1712 ))
1713 }
1714
1715 pub async fn get_compaction_score(
1716 &self,
1717 compaction_group_id: CompactionGroupId,
1718 ) -> Result<Vec<PickerInfo>> {
1719 let req = GetCompactionScoreRequest {
1720 compaction_group_id,
1721 };
1722 let resp = self.inner.get_compaction_score(req).await?;
1723 Ok(resp.scores)
1724 }
1725
1726 pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1727 let req = RiseCtlRebuildTableStatsRequest {};
1728 let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1729 Ok(())
1730 }
1731
1732 pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1733 let req = ListBranchedObjectRequest {};
1734 let resp = self.inner.list_branched_object(req).await?;
1735 Ok(resp.branched_objects)
1736 }
1737
1738 pub async fn list_active_write_limit(&self) -> Result<HashMap<CompactionGroupId, WriteLimit>> {
1739 let req = ListActiveWriteLimitRequest {};
1740 let resp = self.inner.list_active_write_limit(req).await?;
1741 Ok(resp.write_limits)
1742 }
1743
1744 pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1745 let req = ListHummockMetaConfigRequest {};
1746 let resp = self.inner.list_hummock_meta_config(req).await?;
1747 Ok(resp.configs)
1748 }
1749
1750 pub async fn get_table_change_logs(
1751 &self,
1752 epoch_only: bool,
1753 start_epoch_inclusive: Option<u64>,
1754 end_epoch_inclusive: Option<u64>,
1755 table_ids: Option<HashSet<TableId>>,
1756 exclude_empty: bool,
1757 limit: Option<u32>,
1758 ) -> Result<TableChangeLogs> {
1759 let req = GetTableChangeLogsRequest {
1760 epoch_only,
1761 start_epoch_inclusive,
1762 end_epoch_inclusive,
1763 table_ids: table_ids.map(|iter| PbTableFilter {
1764 table_ids: iter.into_iter().collect::<Vec<_>>(),
1765 }),
1766 exclude_empty,
1767 limit,
1768 };
1769 let resp = self.inner.get_table_change_logs(req).await?;
1770 Ok(resp
1771 .table_change_logs
1772 .into_iter()
1773 .map(|(id, change_log)| (TableId::new(id), TableChangeLog::from_protobuf(&change_log)))
1774 .collect())
1775 }
1776
1777 pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1778 let _resp = self
1779 .inner
1780 .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1781 .await?;
1782
1783 Ok(())
1784 }
1785
1786 pub async fn rw_cloud_validate_source(
1787 &self,
1788 source_type: SourceType,
1789 source_config: HashMap<String, String>,
1790 ) -> Result<RwCloudValidateSourceResponse> {
1791 let req = RwCloudValidateSourceRequest {
1792 source_type: source_type.into(),
1793 source_config,
1794 };
1795 let resp = self.inner.rw_cloud_validate_source(req).await?;
1796 Ok(resp)
1797 }
1798
1799 pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1800 self.inner.core.read().await.sink_coordinate_client.clone()
1801 }
1802
1803 pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1804 let req = ListCompactTaskAssignmentRequest {};
1805 let resp = self.inner.list_compact_task_assignment(req).await?;
1806 Ok(resp.task_assignment)
1807 }
1808
1809 pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1810 let req = ListEventLogRequest::default();
1811 let resp = self.inner.list_event_log(req).await?;
1812 Ok(resp.event_logs)
1813 }
1814
1815 pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1816 let req = ListCompactTaskProgressRequest {};
1817 let resp = self.inner.list_compact_task_progress(req).await?;
1818 Ok(resp.task_progress)
1819 }
1820
1821 #[cfg(madsim)]
1822 pub fn try_add_panic_event_blocking(
1823 &self,
1824 panic_info: impl Display,
1825 timeout_millis: Option<u64>,
1826 ) {
1827 }
1828
1829 #[cfg(not(madsim))]
1831 pub fn try_add_panic_event_blocking(
1832 &self,
1833 panic_info: impl Display,
1834 timeout_millis: Option<u64>,
1835 ) {
1836 let event = event_log::EventWorkerNodePanic {
1837 worker_id: self.worker_id,
1838 worker_type: self.worker_type.into(),
1839 host_addr: Some(self.host_addr.to_protobuf()),
1840 panic_info: format!("{panic_info}"),
1841 };
1842 let grpc_meta_client = self.inner.clone();
1843 let _ = thread::spawn(move || {
1844 let rt = tokio::runtime::Builder::new_current_thread()
1845 .enable_all()
1846 .build()
1847 .unwrap();
1848 let req = AddEventLogRequest {
1849 event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1850 };
1851 rt.block_on(async {
1852 let _ = tokio::time::timeout(
1853 Duration::from_millis(timeout_millis.unwrap_or(1000)),
1854 grpc_meta_client.add_event_log(req),
1855 )
1856 .await;
1857 });
1858 })
1859 .join();
1860 }
1861
1862 pub async fn add_sink_fail_evet(
1863 &self,
1864 sink_id: SinkId,
1865 sink_name: String,
1866 connector: String,
1867 error: String,
1868 ) -> Result<()> {
1869 let event = event_log::EventSinkFail {
1870 sink_id,
1871 sink_name,
1872 connector,
1873 error,
1874 };
1875 let req = AddEventLogRequest {
1876 event: Some(add_event_log_request::Event::SinkFail(event)),
1877 };
1878 self.inner.add_event_log(req).await?;
1879 Ok(())
1880 }
1881
1882 pub async fn add_cdc_auto_schema_change_fail_event(
1883 &self,
1884 source_id: SourceId,
1885 table_name: String,
1886 cdc_table_id: String,
1887 upstream_ddl: String,
1888 fail_info: String,
1889 ) -> Result<()> {
1890 let event = event_log::EventAutoSchemaChangeFail {
1891 table_id: source_id.as_cdc_table_id(),
1892 table_name,
1893 cdc_table_id,
1894 upstream_ddl,
1895 fail_info,
1896 };
1897 let req = AddEventLogRequest {
1898 event: Some(add_event_log_request::Event::AutoSchemaChangeFail(event)),
1899 };
1900 self.inner.add_event_log(req).await?;
1901 Ok(())
1902 }
1903
1904 pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1905 let req = CancelCompactTaskRequest {
1906 task_id,
1907 task_status: task_status as _,
1908 };
1909 let resp = self.inner.cancel_compact_task(req).await?;
1910 Ok(resp.ret)
1911 }
1912
1913 pub async fn get_version_by_epoch(
1914 &self,
1915 epoch: HummockEpoch,
1916 table_id: TableId,
1917 ) -> Result<PbHummockVersion> {
1918 let req = GetVersionByEpochRequest { epoch, table_id };
1919 let resp = self.inner.get_version_by_epoch(req).await?;
1920 Ok(resp.version.unwrap())
1921 }
1922
1923 pub async fn get_cluster_limits(
1924 &self,
1925 ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1926 let req = GetClusterLimitsRequest {};
1927 let resp = self.inner.get_cluster_limits(req).await?;
1928 Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1929 }
1930
1931 pub async fn merge_compaction_group(
1932 &self,
1933 left_group_id: CompactionGroupId,
1934 right_group_id: CompactionGroupId,
1935 ) -> Result<()> {
1936 let req = MergeCompactionGroupRequest {
1937 left_group_id,
1938 right_group_id,
1939 };
1940 self.inner.merge_compaction_group(req).await?;
1941 Ok(())
1942 }
1943
1944 pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1946 let request = ListRateLimitsRequest {};
1947 let resp = self.inner.list_rate_limits(request).await?;
1948 Ok(resp.rate_limits)
1949 }
1950
1951 pub async fn list_cdc_progress(&self) -> Result<HashMap<JobId, PbCdcProgress>> {
1952 let request = ListCdcProgressRequest {};
1953 let resp = self.inner.list_cdc_progress(request).await?;
1954 Ok(resp.cdc_progress)
1955 }
1956
1957 pub async fn list_refresh_table_states(&self) -> Result<Vec<RefreshTableState>> {
1958 let request = ListRefreshTableStatesRequest {};
1959 let resp = self.inner.list_refresh_table_states(request).await?;
1960 Ok(resp.states)
1961 }
1962
1963 pub async fn list_iceberg_compaction_status(
1964 &self,
1965 ) -> Result<Vec<list_iceberg_compaction_status_response::IcebergCompactionStatus>> {
1966 let request = ListIcebergCompactionStatusRequest {};
1967 let resp = self.inner.list_iceberg_compaction_status(request).await?;
1968 Ok(resp.statuses)
1969 }
1970
1971 pub async fn list_sink_log_store_tables(
1972 &self,
1973 ) -> Result<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
1974 let request = ListSinkLogStoreTablesRequest {};
1975 let resp = self.inner.list_sink_log_store_tables(request).await?;
1976 Ok(resp.tables)
1977 }
1978
1979 pub async fn create_iceberg_table(
1980 &self,
1981 table_job_info: PbTableJobInfo,
1982 sink_job_info: PbSinkJobInfo,
1983 iceberg_source: PbSource,
1984 if_not_exists: bool,
1985 ) -> Result<WaitVersion> {
1986 let request = CreateIcebergTableRequest {
1987 table_info: Some(table_job_info),
1988 sink_info: Some(sink_job_info),
1989 iceberg_source: Some(iceberg_source),
1990 if_not_exists,
1991 };
1992
1993 let resp = Box::pin(self.inner.create_iceberg_table(request)).await?;
1994 Ok(resp
1995 .version
1996 .ok_or_else(|| anyhow!("wait version not set"))?)
1997 }
1998
1999 pub async fn list_hosted_iceberg_tables(&self) -> Result<Vec<IcebergTable>> {
2000 let request = ListIcebergTablesRequest {};
2001 let resp = self.inner.list_iceberg_tables(request).await?;
2002 Ok(resp.iceberg_tables)
2003 }
2004
2005 pub async fn get_cluster_stack_trace(
2007 &self,
2008 actor_traces_format: ActorTracesFormat,
2009 ) -> Result<StackTraceResponse> {
2010 let request = StackTraceRequest {
2011 actor_traces_format: actor_traces_format as i32,
2012 };
2013 let resp = self.inner.stack_trace(request).await?;
2014 Ok(resp)
2015 }
2016
2017 pub async fn set_sync_log_store_aligned(&self, job_id: JobId, aligned: bool) -> Result<()> {
2018 let request = SetSyncLogStoreAlignedRequest { job_id, aligned };
2019 self.inner.set_sync_log_store_aligned(request).await?;
2020 Ok(())
2021 }
2022
2023 pub async fn refresh(&self, request: RefreshRequest) -> Result<RefreshResponse> {
2024 self.inner.refresh(request).await
2025 }
2026
2027 pub async fn list_unmigrated_tables(&self) -> Result<HashMap<TableId, String>> {
2028 let request = ListUnmigratedTablesRequest {};
2029 let resp = self.inner.list_unmigrated_tables(request).await?;
2030 Ok(resp
2031 .tables
2032 .into_iter()
2033 .map(|table| (table.table_id, table.table_name))
2034 .collect())
2035 }
2036}
2037
2038#[async_trait]
2039impl HummockMetaClient for MetaClient {
2040 async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
2041 let req = UnpinVersionBeforeRequest {
2042 context_id: self.worker_id(),
2043 unpin_version_before,
2044 };
2045 self.inner.unpin_version_before(req).await?;
2046 Ok(())
2047 }
2048
2049 async fn get_current_version(&self) -> Result<HummockVersion> {
2050 let req = GetCurrentVersionRequest::default();
2051 Ok(HummockVersion::from_rpc_protobuf(
2052 &self
2053 .inner
2054 .get_current_version(req)
2055 .await?
2056 .current_version
2057 .unwrap(),
2058 ))
2059 }
2060
2061 async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
2062 let resp = self
2063 .inner
2064 .get_new_object_ids(GetNewObjectIdsRequest { number })
2065 .await?;
2066 Ok(ObjectIdRange::new(resp.start_id, resp.end_id))
2067 }
2068
2069 async fn commit_epoch_with_change_log(
2070 &self,
2071 _epoch: HummockEpoch,
2072 _sync_result: SyncResult,
2073 _change_log_info: Option<HummockMetaClientChangeLogInfo>,
2074 ) -> Result<()> {
2075 panic!("Only meta service can commit_epoch in production.")
2076 }
2077
2078 async fn trigger_manual_compaction(
2079 &self,
2080 compaction_group_id: CompactionGroupId,
2081 table_id: JobId,
2082 level: u32,
2083 sst_ids: Vec<HummockSstableId>,
2084 exclusive: bool,
2085 ) -> Result<bool> {
2086 let req = TriggerManualCompactionRequest {
2088 compaction_group_id,
2089 table_id,
2090 level,
2093 sst_ids,
2094 exclusive: Some(exclusive),
2095 ..Default::default()
2096 };
2097
2098 let resp = self.inner.trigger_manual_compaction(req).await?;
2099 Ok(resp.should_retry.unwrap_or(false))
2100 }
2101
2102 async fn trigger_full_gc(
2103 &self,
2104 sst_retention_time_sec: u64,
2105 prefix: Option<String>,
2106 ) -> Result<()> {
2107 self.inner
2108 .trigger_full_gc(TriggerFullGcRequest {
2109 sst_retention_time_sec,
2110 prefix,
2111 })
2112 .await?;
2113 Ok(())
2114 }
2115
2116 async fn subscribe_compaction_event(
2117 &self,
2118 ) -> Result<(
2119 UnboundedSender<SubscribeCompactionEventRequest>,
2120 BoxStream<'static, CompactionEventItem>,
2121 )> {
2122 let (request_sender, request_receiver) =
2123 unbounded_channel::<SubscribeCompactionEventRequest>();
2124 request_sender
2125 .send(SubscribeCompactionEventRequest {
2126 event: Some(subscribe_compaction_event_request::Event::Register(
2127 Register {
2128 context_id: self.worker_id(),
2129 },
2130 )),
2131 create_at: SystemTime::now()
2132 .duration_since(SystemTime::UNIX_EPOCH)
2133 .expect("Clock may have gone backwards")
2134 .as_millis() as u64,
2135 })
2136 .context("Failed to subscribe compaction event")?;
2137
2138 let stream = self
2139 .inner
2140 .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
2141 request_receiver,
2142 )))
2143 .await?;
2144
2145 Ok((request_sender, Box::pin(stream)))
2146 }
2147
2148 async fn get_version_by_epoch(
2149 &self,
2150 epoch: HummockEpoch,
2151 table_id: TableId,
2152 ) -> Result<PbHummockVersion> {
2153 self.get_version_by_epoch(epoch, table_id).await
2154 }
2155
2156 async fn subscribe_iceberg_compaction_event(
2157 &self,
2158 ) -> Result<(
2159 UnboundedSender<SubscribeIcebergCompactionEventRequest>,
2160 BoxStream<'static, IcebergCompactionEventItem>,
2161 )> {
2162 let (request_sender, request_receiver) =
2163 unbounded_channel::<SubscribeIcebergCompactionEventRequest>();
2164 request_sender
2165 .send(SubscribeIcebergCompactionEventRequest {
2166 event: Some(subscribe_iceberg_compaction_event_request::Event::Register(
2167 IcebergRegister {
2168 context_id: self.worker_id(),
2169 },
2170 )),
2171 create_at: SystemTime::now()
2172 .duration_since(SystemTime::UNIX_EPOCH)
2173 .expect("Clock may have gone backwards")
2174 .as_millis() as u64,
2175 })
2176 .context("Failed to subscribe compaction event")?;
2177
2178 let stream = self
2179 .inner
2180 .subscribe_iceberg_compaction_event(Request::new(UnboundedReceiverStream::new(
2181 request_receiver,
2182 )))
2183 .await?;
2184
2185 Ok((request_sender, Box::pin(stream)))
2186 }
2187
2188 async fn get_table_change_logs(
2189 &self,
2190 epoch_only: bool,
2191 start_epoch_inclusive: Option<u64>,
2192 end_epoch_inclusive: Option<u64>,
2193 table_ids: Option<HashSet<TableId>>,
2194 exclude_empty: bool,
2195 limit: Option<u32>,
2196 ) -> Result<TableChangeLogs> {
2197 self.get_table_change_logs(
2198 epoch_only,
2199 start_epoch_inclusive,
2200 end_epoch_inclusive,
2201 table_ids,
2202 exclude_empty,
2203 limit,
2204 )
2205 .await
2206 }
2207}
2208
2209#[async_trait]
2210impl TelemetryInfoFetcher for MetaClient {
2211 async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
2212 let resp = self
2213 .get_telemetry_info()
2214 .await
2215 .map_err(|e| e.to_report_string())?;
2216 let tracking_id = resp.get_tracking_id().ok();
2217 Ok(tracking_id.map(|id| id.to_owned()))
2218 }
2219}
2220
2221pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
2222
2223#[derive(Debug, Clone)]
2224struct GrpcMetaClientCore {
2225 cluster_client: ClusterServiceClient<Channel>,
2226 meta_member_client: MetaMemberServiceClient<Channel>,
2227 heartbeat_client: HeartbeatServiceClient<Channel>,
2228 ddl_client: DdlServiceClient<Channel>,
2229 hummock_client: HummockManagerServiceClient<Channel>,
2230 notification_client: NotificationServiceClient<Channel>,
2231 stream_client: StreamManagerServiceClient<Channel>,
2232 user_client: UserServiceClient<Channel>,
2233 scale_client: ScaleServiceClient<Channel>,
2234 backup_client: BackupServiceClient<Channel>,
2235 telemetry_client: TelemetryInfoServiceClient<Channel>,
2236 system_params_client: SystemParamsServiceClient<Channel>,
2237 session_params_client: SessionParamServiceClient<Channel>,
2238 serving_client: ServingServiceClient<Channel>,
2239 cloud_client: CloudServiceClient<Channel>,
2240 sink_coordinate_client: SinkCoordinationRpcClient,
2241 event_log_client: EventLogServiceClient<Channel>,
2242 cluster_limit_client: ClusterLimitServiceClient<Channel>,
2243 hosted_iceberg_catalog_service_client: HostedIcebergCatalogServiceClient<Channel>,
2244 monitor_client: MonitorServiceClient<Channel>,
2245}
2246
2247impl GrpcMetaClientCore {
2248 pub(crate) fn new(channel: Channel) -> Self {
2249 let cluster_client = ClusterServiceClient::new(channel.clone());
2250 let meta_member_client = MetaMemberClient::new(channel.clone());
2251 let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
2252 let ddl_client =
2253 DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2254 let hummock_client =
2255 HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2256 let notification_client =
2257 NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2258 let stream_client =
2259 StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2260 let user_client = UserServiceClient::new(channel.clone());
2261 let scale_client =
2262 ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2263 let backup_client = BackupServiceClient::new(channel.clone());
2264 let telemetry_client =
2265 TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
2266 let system_params_client = SystemParamsServiceClient::new(channel.clone());
2267 let session_params_client = SessionParamServiceClient::new(channel.clone());
2268 let serving_client = ServingServiceClient::new(channel.clone());
2269 let cloud_client = CloudServiceClient::new(channel.clone());
2270 let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone())
2271 .max_decoding_message_size(usize::MAX);
2272 let event_log_client = EventLogServiceClient::new(channel.clone());
2273 let cluster_limit_client = ClusterLimitServiceClient::new(channel.clone());
2274 let hosted_iceberg_catalog_service_client =
2275 HostedIcebergCatalogServiceClient::new(channel.clone());
2276 let monitor_client = MonitorServiceClient::new(channel);
2277
2278 GrpcMetaClientCore {
2279 cluster_client,
2280 meta_member_client,
2281 heartbeat_client,
2282 ddl_client,
2283 hummock_client,
2284 notification_client,
2285 stream_client,
2286 user_client,
2287 scale_client,
2288 backup_client,
2289 telemetry_client,
2290 system_params_client,
2291 session_params_client,
2292 serving_client,
2293 cloud_client,
2294 sink_coordinate_client,
2295 event_log_client,
2296 cluster_limit_client,
2297 hosted_iceberg_catalog_service_client,
2298 monitor_client,
2299 }
2300 }
2301}
2302
2303#[derive(Debug, Clone)]
2307struct GrpcMetaClient {
2308 member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
2309 core: Arc<RwLock<GrpcMetaClientCore>>,
2310}
2311
2312type MetaMemberClient = MetaMemberServiceClient<Channel>;
2313
2314struct MetaMemberGroup {
2315 members: LruCache<http::Uri, Option<MetaMemberClient>>,
2316}
2317
2318struct MetaMemberManagement {
2319 core_ref: Arc<RwLock<GrpcMetaClientCore>>,
2320 members: Either<MetaMemberClient, MetaMemberGroup>,
2321 current_leader: http::Uri,
2322 meta_config: Arc<MetaConfig>,
2323}
2324
2325impl MetaMemberManagement {
2326 const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
2327
2328 fn host_address_to_uri(addr: HostAddress) -> http::Uri {
2329 format!("http://{}:{}", addr.host, addr.port)
2330 .parse()
2331 .unwrap()
2332 }
2333
2334 async fn recreate_core(&self, channel: Channel) {
2335 let mut core = self.core_ref.write().await;
2336 *core = GrpcMetaClientCore::new(channel);
2337 }
2338
2339 async fn refresh_members(&mut self) -> Result<()> {
2340 let leader_addr = match self.members.as_mut() {
2341 Either::Left(client) => {
2342 let resp = client
2343 .to_owned()
2344 .members(MembersRequest {})
2345 .await
2346 .map_err(RpcError::from_meta_status)?;
2347 let resp = resp.into_inner();
2348 resp.members.into_iter().find(|member| member.is_leader)
2349 }
2350 Either::Right(member_group) => {
2351 let mut fetched_members = None;
2352
2353 for (addr, client) in &mut member_group.members {
2354 let members: Result<_> = try {
2355 let mut client = match client {
2356 Some(cached_client) => cached_client.to_owned(),
2357 None => {
2358 let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
2359 let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
2360 .await
2361 .context("failed to create client")?;
2362 let new_client: MetaMemberClient =
2363 MetaMemberServiceClient::new(channel);
2364 *client = Some(new_client.clone());
2365
2366 new_client
2367 }
2368 };
2369
2370 let resp = client
2371 .members(MembersRequest {})
2372 .await
2373 .context("failed to fetch members")?;
2374
2375 resp.into_inner().members
2376 };
2377
2378 let fetched = members.is_ok();
2379 fetched_members = Some(members);
2380 if fetched {
2381 break;
2382 }
2383 }
2384
2385 let members = fetched_members
2386 .context("no member available in the list")?
2387 .context("could not refresh members")?;
2388
2389 let mut leader = None;
2391 for member in members {
2392 if member.is_leader {
2393 leader = Some(member.clone());
2394 }
2395
2396 let addr = Self::host_address_to_uri(member.address.unwrap());
2397 if !member_group.members.contains(&addr) {
2399 tracing::info!("new meta member joined: {}", addr);
2400 member_group.members.put(addr, None);
2401 }
2402 }
2403
2404 leader
2405 }
2406 };
2407
2408 if let Some(leader) = leader_addr {
2409 let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
2410
2411 if discovered_leader != self.current_leader {
2412 tracing::info!("new meta leader {} discovered", discovered_leader);
2413
2414 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
2415 Duration::from_secs(self.meta_config.meta_leader_lease_secs),
2416 false,
2417 );
2418
2419 let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
2420 let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
2421 GrpcMetaClient::connect_to_endpoint(endpoint).await
2422 })
2423 .await?;
2424
2425 self.recreate_core(channel).await;
2426 self.current_leader = discovered_leader;
2427 }
2428 }
2429
2430 Ok(())
2431 }
2432}
2433
2434impl GrpcMetaClient {
2435 const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
2437 const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
2439 const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
2441 const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
2443
2444 fn start_meta_member_monitor(
2445 &self,
2446 init_leader_addr: http::Uri,
2447 members: Either<MetaMemberClient, MetaMemberGroup>,
2448 force_refresh_receiver: Receiver<Sender<Result<()>>>,
2449 meta_config: Arc<MetaConfig>,
2450 ) -> Result<()> {
2451 let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
2452 let current_leader = init_leader_addr;
2453
2454 let enable_period_tick = matches!(members, Either::Right(_));
2455
2456 let member_management = MetaMemberManagement {
2457 core_ref,
2458 members,
2459 current_leader,
2460 meta_config,
2461 };
2462
2463 let mut force_refresh_receiver = force_refresh_receiver;
2464
2465 tokio::spawn(async move {
2466 let mut member_management = member_management;
2467 let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
2468
2469 loop {
2470 let event: Option<Sender<Result<()>>> = if enable_period_tick {
2471 tokio::select! {
2472 _ = ticker.tick() => None,
2473 result_sender = force_refresh_receiver.recv() => {
2474 if result_sender.is_none() {
2475 break;
2476 }
2477
2478 result_sender
2479 },
2480 }
2481 } else {
2482 let result_sender = force_refresh_receiver.recv().await;
2483
2484 if result_sender.is_none() {
2485 break;
2486 }
2487
2488 result_sender
2489 };
2490
2491 let tick_result = member_management.refresh_members().await;
2492 if let Err(e) = tick_result.as_ref() {
2493 tracing::warn!(error = %e.as_report(), "refresh meta member client failed");
2494 }
2495
2496 if let Some(sender) = event {
2497 let _resp = sender.send(tick_result);
2499 }
2500 }
2501 });
2502
2503 Ok(())
2504 }
2505
2506 async fn force_refresh_leader(&self) -> Result<()> {
2507 let (sender, receiver) = oneshot::channel();
2508
2509 self.member_monitor_event_sender
2510 .send(sender)
2511 .await
2512 .map_err(|e| anyhow!(e))?;
2513
2514 receiver.await.map_err(|e| anyhow!(e))?
2515 }
2516
2517 pub async fn new(strategy: &MetaAddressStrategy, config: Arc<MetaConfig>) -> Result<Self> {
2519 let (channel, addr) = match strategy {
2520 MetaAddressStrategy::LoadBalance(addr) => {
2521 Self::try_build_rpc_channel(vec![addr.clone()]).await
2522 }
2523 MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
2524 }?;
2525 let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
2526 let client = GrpcMetaClient {
2527 member_monitor_event_sender: force_refresh_sender,
2528 core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
2529 };
2530
2531 let meta_member_client = client.core.read().await.meta_member_client.clone();
2532 let members = match strategy {
2533 MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
2534 MetaAddressStrategy::List(addrs) => {
2535 let mut members = LruCache::new(20.try_into().unwrap());
2536 for addr in addrs {
2537 members.put(addr.clone(), None);
2538 }
2539 members.put(addr.clone(), Some(meta_member_client));
2540
2541 Either::Right(MetaMemberGroup { members })
2542 }
2543 };
2544
2545 client.start_meta_member_monitor(addr, members, force_refresh_receiver, config.clone())?;
2546
2547 client.force_refresh_leader().await?;
2548
2549 Ok(client)
2550 }
2551
2552 fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2553 Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2554 }
2555
2556 pub(crate) async fn try_build_rpc_channel(
2557 addrs: impl IntoIterator<Item = http::Uri>,
2558 ) -> Result<(Channel, http::Uri)> {
2559 let endpoints: Vec<_> = addrs
2560 .into_iter()
2561 .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2562 .collect();
2563
2564 let mut last_error = None;
2565
2566 for (endpoint, addr) in endpoints {
2567 match Self::connect_to_endpoint(endpoint).await {
2568 Ok(channel) => {
2569 tracing::info!("Connect to meta server {} successfully", addr);
2570 return Ok((channel, addr));
2571 }
2572 Err(e) => {
2573 tracing::warn!(
2574 error = %e.as_report(),
2575 "Failed to connect to meta server {}, trying again",
2576 addr,
2577 );
2578 last_error = Some(e);
2579 }
2580 }
2581 }
2582
2583 if let Some(last_error) = last_error {
2584 Err(anyhow::anyhow!(last_error)
2585 .context("failed to connect to all meta servers")
2586 .into())
2587 } else {
2588 bail!("no meta server address provided")
2589 }
2590 }
2591
2592 async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2593 let channel = endpoint
2594 .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2595 .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2596 .connect_timeout(Duration::from_secs(5))
2597 .monitored_connect("grpc-meta-client", Default::default())
2598 .await?
2599 .wrapped();
2600
2601 Ok(channel)
2602 }
2603
2604 pub(crate) fn retry_strategy_to_bound(
2605 high_bound: Duration,
2606 exceed: bool,
2607 ) -> impl Iterator<Item = Duration> {
2608 let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2609 .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2610 .map(jitter);
2611
2612 let mut sum = Duration::default();
2613
2614 iter.take_while(move |duration| {
2615 sum += *duration;
2616
2617 if exceed {
2618 sum < high_bound + *duration
2619 } else {
2620 sum < high_bound
2621 }
2622 })
2623 }
2624}
2625
2626macro_rules! for_all_meta_rpc {
2627 ($macro:ident) => {
2628 $macro! {
2629 { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2630 ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2631 ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2632 ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2633 ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2634 ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2635 ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2636 ,{ stream_client, flush, FlushRequest, FlushResponse }
2637 ,{ stream_client, pause, PauseRequest, PauseResponse }
2638 ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2639 ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2640 ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2641 ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2642 ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2643 ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2644 ,{ stream_client, list_creating_fragment_distribution, ListCreatingFragmentDistributionRequest, ListCreatingFragmentDistributionResponse }
2645 ,{ stream_client, list_sink_log_store_tables, ListSinkLogStoreTablesRequest, ListSinkLogStoreTablesResponse }
2646 ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2647 ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2648 ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2649 ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2650 ,{ stream_client, list_cdc_progress, ListCdcProgressRequest, ListCdcProgressResponse }
2651 ,{ stream_client, list_refresh_table_states, ListRefreshTableStatesRequest, ListRefreshTableStatesResponse }
2652 ,{ stream_client, list_iceberg_compaction_status, ListIcebergCompactionStatusRequest, ListIcebergCompactionStatusResponse }
2653 ,{ stream_client, alter_connector_props, AlterConnectorPropsRequest, AlterConnectorPropsResponse }
2654 ,{ stream_client, alter_source_properties_safe, AlterSourcePropertiesSafeRequest, AlterSourcePropertiesSafeResponse }
2655 ,{ stream_client, reset_source_splits, ResetSourceSplitsRequest, ResetSourceSplitsResponse }
2656 ,{ stream_client, inject_source_offsets, InjectSourceOffsetsRequest, InjectSourceOffsetsResponse }
2657 ,{ stream_client, get_fragment_by_id, GetFragmentByIdRequest, GetFragmentByIdResponse }
2658 ,{ stream_client, get_fragment_vnodes, GetFragmentVnodesRequest, GetFragmentVnodesResponse }
2659 ,{ stream_client, get_actor_vnodes, GetActorVnodesRequest, GetActorVnodesResponse }
2660 ,{ stream_client, set_sync_log_store_aligned, SetSyncLogStoreAlignedRequest, SetSyncLogStoreAlignedResponse }
2661 ,{ stream_client, refresh, RefreshRequest, RefreshResponse }
2662 ,{ stream_client, list_unmigrated_tables, ListUnmigratedTablesRequest, ListUnmigratedTablesResponse }
2663 ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2664 ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2665 ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2666 ,{ ddl_client, alter_subscription_retention, AlterSubscriptionRetentionRequest, AlterSubscriptionRetentionResponse }
2667 ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2668 ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2669 ,{ ddl_client, alter_backfill_parallelism, AlterBackfillParallelismRequest, AlterBackfillParallelismResponse }
2670 ,{ ddl_client, alter_streaming_job_config, AlterStreamingJobConfigRequest, AlterStreamingJobConfigResponse }
2671 ,{ ddl_client, alter_fragment_parallelism, AlterFragmentParallelismRequest, AlterFragmentParallelismResponse }
2672 ,{ ddl_client, alter_cdc_table_backfill_parallelism, AlterCdcTableBackfillParallelismRequest, AlterCdcTableBackfillParallelismResponse }
2673 ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2674 ,{ ddl_client, alter_database_param, AlterDatabaseParamRequest, AlterDatabaseParamResponse }
2675 ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2676 ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2677 ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2678 ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2679 ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2680 ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2681 ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2682 ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2683 ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2684 ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2685 ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2686 ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2687 ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2688 ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2689 ,{ ddl_client, reset_source, ResetSourceRequest, ResetSourceResponse }
2690 ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2691 ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2692 ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2693 ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2694 ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2695 ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2696 ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2697 ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2698 ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2699 ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2700 ,{ ddl_client, risectl_resume_backfill, RisectlResumeBackfillRequest, RisectlResumeBackfillResponse }
2701 ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2702 ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2703 ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2704 ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2705 ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2706 ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2707 ,{ ddl_client, wait, WaitRequest, WaitResponse }
2708 ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2709 ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2710 ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2711 ,{ ddl_client, compact_iceberg_table, CompactIcebergTableRequest, CompactIcebergTableResponse }
2712 ,{ ddl_client, expire_iceberg_table_snapshots, ExpireIcebergTableSnapshotsRequest, ExpireIcebergTableSnapshotsResponse }
2713 ,{ ddl_client, create_iceberg_table, CreateIcebergTableRequest, CreateIcebergTableResponse }
2714 ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2715 ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2716 ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2717 ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2718 ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2719 ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2720 ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2721 ,{ hummock_client, get_new_object_ids, GetNewObjectIdsRequest, GetNewObjectIdsResponse }
2722 ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2723 ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2724 ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2725 ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2726 ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2727 ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2728 ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2729 ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2730 ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2731 ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2732 ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2733 ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2734 ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2735 ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2736 ,{ hummock_client, subscribe_iceberg_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeIcebergCompactionEventRequest>, Streaming<SubscribeIcebergCompactionEventResponse> }
2737 ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2738 ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2739 ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2740 ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2741 ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2742 ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2743 ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2744 ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2745 ,{ hummock_client, get_table_change_logs, GetTableChangeLogsRequest, GetTableChangeLogsResponse }
2746 ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2747 ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2748 ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2749 ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2750 ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2751 ,{ user_client, alter_default_privilege, AlterDefaultPrivilegeRequest, AlterDefaultPrivilegeResponse }
2752 ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2753 ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2754 ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2755 ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2756 ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2757 ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2758 ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2759 ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2760 ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2761 ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2762 ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2763 ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2764 ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2765 ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2766 ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2767 ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2768 ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2769 ,{ hosted_iceberg_catalog_service_client, list_iceberg_tables, ListIcebergTablesRequest, ListIcebergTablesResponse }
2770 ,{ monitor_client, stack_trace, StackTraceRequest, StackTraceResponse }
2771 }
2772 };
2773}
2774
2775impl GrpcMetaClient {
2776 async fn refresh_client_if_needed(&self, code: Code) {
2777 if matches!(
2778 code,
2779 Code::Unknown | Code::Unimplemented | Code::Unavailable
2780 ) {
2781 tracing::debug!("matching tonic code {}", code);
2782 let (result_sender, result_receiver) = oneshot::channel();
2783 if self
2784 .member_monitor_event_sender
2785 .try_send(result_sender)
2786 .is_ok()
2787 {
2788 if let Ok(Err(e)) = result_receiver.await {
2789 tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2790 }
2791 } else {
2792 tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2793 }
2794 }
2795 }
2796}
2797
2798impl GrpcMetaClient {
2799 for_all_meta_rpc! { meta_rpc_client_method_impl }
2800}