1use std::collections::{HashMap, HashSet};
16use std::fmt::{Debug, Display};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering::Relaxed;
20use std::thread;
21use std::time::{Duration, SystemTime};
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use cluster_limit_service_client::ClusterLimitServiceClient;
26use either::Either;
27use futures::stream::BoxStream;
28use list_rate_limits_response::RateLimitInfo;
29use lru::LruCache;
30use replace_job_plan::ReplaceJob;
31use risingwave_common::RW_VERSION;
32use risingwave_common::catalog::{FunctionId, IndexId, ObjectId, SecretId, TableId};
33use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, MetaConfig};
34use risingwave_common::hash::WorkerSlotMapping;
35use risingwave_common::monitor::EndpointExt;
36use risingwave_common::system_param::reader::SystemParamsReader;
37use risingwave_common::telemetry::report::TelemetryInfoFetcher;
38use risingwave_common::util::addr::HostAddr;
39use risingwave_common::util::column_index_mapping::ColIndexMapping;
40use risingwave_common::util::meta_addr::MetaAddressStrategy;
41use risingwave_common::util::resource_util::cpu::total_cpu_available;
42use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
43use risingwave_error::bail;
44use risingwave_error::tonic::ErrorIsFromTonicServerImpl;
45use risingwave_hummock_sdk::compaction_group::StateTableId;
46use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
47use risingwave_hummock_sdk::{
48 CompactionGroupId, HummockEpoch, HummockVersionId, SstObjectIdRange, SyncResult,
49};
50use risingwave_pb::backup_service::backup_service_client::BackupServiceClient;
51use risingwave_pb::backup_service::*;
52use risingwave_pb::catalog::{
53 Connection, PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
54 PbSubscription, PbTable, PbView, Table,
55};
56use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient;
57use risingwave_pb::cloud_service::*;
58use risingwave_pb::common::worker_node::Property;
59use risingwave_pb::common::{HostAddress, WorkerNode, WorkerType};
60use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
61use risingwave_pb::ddl_service::alter_owner_request::Object;
62use risingwave_pb::ddl_service::create_materialized_view_request::PbBackfillType;
63use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
64use risingwave_pb::ddl_service::drop_table_request::SourceId;
65use risingwave_pb::ddl_service::*;
66use risingwave_pb::hummock::compact_task::TaskStatus;
67use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
68use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
69use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
70use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
71use risingwave_pb::hummock::write_limits::WriteLimit;
72use risingwave_pb::hummock::*;
73use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
74use risingwave_pb::meta::cluster_service_client::ClusterServiceClient;
75use risingwave_pb::meta::event_log_service_client::EventLogServiceClient;
76use risingwave_pb::meta::heartbeat_service_client::HeartbeatServiceClient;
77use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
78use risingwave_pb::meta::list_actor_states_response::ActorState;
79use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution;
80use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
81use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
82use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
83use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
84use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
85use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
86use risingwave_pb::meta::serving_service_client::ServingServiceClient;
87use risingwave_pb::meta::session_param_service_client::SessionParamServiceClient;
88use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
89use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
90use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
91use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
92use risingwave_pb::meta::*;
93use risingwave_pb::stream_plan::StreamFragmentGraph;
94use risingwave_pb::user::update_user_request::UpdateField;
95use risingwave_pb::user::user_service_client::UserServiceClient;
96use risingwave_pb::user::*;
97use thiserror_ext::AsReport;
98use tokio::sync::mpsc::{Receiver, UnboundedSender, unbounded_channel};
99use tokio::sync::oneshot::Sender;
100use tokio::sync::{RwLock, mpsc, oneshot};
101use tokio::task::JoinHandle;
102use tokio::time::{self};
103use tokio_retry::strategy::{ExponentialBackoff, jitter};
104use tokio_stream::wrappers::UnboundedReceiverStream;
105use tonic::transport::Endpoint;
106use tonic::{Code, Request, Streaming};
107
108use crate::channel::{Channel, WrappedChannelExt};
109use crate::error::{Result, RpcError};
110use crate::hummock_meta_client::{
111 CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
112};
113use crate::meta_rpc_client_method_impl;
114
115type ConnectionId = u32;
116type DatabaseId = u32;
117type SchemaId = u32;
118
119#[derive(Clone, Debug)]
121pub struct MetaClient {
122 worker_id: u32,
123 worker_type: WorkerType,
124 host_addr: HostAddr,
125 inner: GrpcMetaClient,
126 meta_config: MetaConfig,
127 cluster_id: String,
128 shutting_down: Arc<AtomicBool>,
129}
130
131impl MetaClient {
132 pub fn worker_id(&self) -> u32 {
133 self.worker_id
134 }
135
136 pub fn host_addr(&self) -> &HostAddr {
137 &self.host_addr
138 }
139
140 pub fn worker_type(&self) -> WorkerType {
141 self.worker_type
142 }
143
144 pub fn cluster_id(&self) -> &str {
145 &self.cluster_id
146 }
147
148 pub async fn subscribe(
150 &self,
151 subscribe_type: SubscribeType,
152 ) -> Result<Streaming<SubscribeResponse>> {
153 let request = SubscribeRequest {
154 subscribe_type: subscribe_type as i32,
155 host: Some(self.host_addr.to_protobuf()),
156 worker_id: self.worker_id(),
157 };
158
159 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
160 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
161 true,
162 );
163
164 tokio_retry::Retry::spawn(retry_strategy, || async {
165 let request = request.clone();
166 self.inner.subscribe(request).await
167 })
168 .await
169 }
170
171 pub async fn create_connection(
172 &self,
173 connection_name: String,
174 database_id: u32,
175 schema_id: u32,
176 owner_id: u32,
177 req: create_connection_request::Payload,
178 ) -> Result<WaitVersion> {
179 let request = CreateConnectionRequest {
180 name: connection_name,
181 database_id,
182 schema_id,
183 owner_id,
184 payload: Some(req),
185 };
186 let resp = self.inner.create_connection(request).await?;
187 Ok(resp
188 .version
189 .ok_or_else(|| anyhow!("wait version not set"))?)
190 }
191
192 pub async fn create_secret(
193 &self,
194 secret_name: String,
195 database_id: u32,
196 schema_id: u32,
197 owner_id: u32,
198 value: Vec<u8>,
199 ) -> Result<WaitVersion> {
200 let request = CreateSecretRequest {
201 name: secret_name,
202 database_id,
203 schema_id,
204 owner_id,
205 value,
206 };
207 let resp = self.inner.create_secret(request).await?;
208 Ok(resp
209 .version
210 .ok_or_else(|| anyhow!("wait version not set"))?)
211 }
212
213 pub async fn list_connections(&self, _name: Option<&str>) -> Result<Vec<Connection>> {
214 let request = ListConnectionsRequest {};
215 let resp = self.inner.list_connections(request).await?;
216 Ok(resp.connections)
217 }
218
219 pub async fn drop_connection(&self, connection_id: ConnectionId) -> Result<WaitVersion> {
220 let request = DropConnectionRequest { connection_id };
221 let resp = self.inner.drop_connection(request).await?;
222 Ok(resp
223 .version
224 .ok_or_else(|| anyhow!("wait version not set"))?)
225 }
226
227 pub async fn drop_secret(&self, secret_id: SecretId) -> Result<WaitVersion> {
228 let request = DropSecretRequest {
229 secret_id: secret_id.into(),
230 };
231 let resp = self.inner.drop_secret(request).await?;
232 Ok(resp
233 .version
234 .ok_or_else(|| anyhow!("wait version not set"))?)
235 }
236
237 pub async fn register_new(
241 addr_strategy: MetaAddressStrategy,
242 worker_type: WorkerType,
243 addr: &HostAddr,
244 property: Property,
245 meta_config: &MetaConfig,
246 ) -> (Self, SystemParamsReader) {
247 let ret =
248 Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;
249
250 match ret {
251 Ok(ret) => ret,
252 Err(err) => {
253 tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
254 std::process::exit(1);
255 }
256 }
257 }
258
259 async fn register_new_inner(
260 addr_strategy: MetaAddressStrategy,
261 worker_type: WorkerType,
262 addr: &HostAddr,
263 property: Property,
264 meta_config: &MetaConfig,
265 ) -> Result<(Self, SystemParamsReader)> {
266 tracing::info!("register meta client using strategy: {}", addr_strategy);
267
268 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
270 Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64),
271 true,
272 );
273
274 if property.is_unschedulable {
275 tracing::warn!("worker {:?} registered as unschedulable", addr.clone());
276 }
277 let init_result: Result<_> = tokio_retry::RetryIf::spawn(
278 retry_strategy,
279 || async {
280 let grpc_meta_client =
281 GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;
282
283 let add_worker_resp = grpc_meta_client
284 .add_worker_node(AddWorkerNodeRequest {
285 worker_type: worker_type as i32,
286 host: Some(addr.to_protobuf()),
287 property: Some(property.clone()),
288 resource: Some(risingwave_pb::common::worker_node::Resource {
289 rw_version: RW_VERSION.to_owned(),
290 total_memory_bytes: system_memory_available_bytes() as _,
291 total_cpu_cores: total_cpu_available() as _,
292 }),
293 })
294 .await
295 .context("failed to add worker node")?;
296
297 let system_params_resp = grpc_meta_client
298 .get_system_params(GetSystemParamsRequest {})
299 .await
300 .context("failed to get initial system params")?;
301
302 Ok((add_worker_resp, system_params_resp, grpc_meta_client))
303 },
304 |e: &RpcError| !e.is_from_tonic_server_impl(),
307 )
308 .await;
309
310 let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
311 let worker_id = add_worker_resp
312 .node_id
313 .expect("AddWorkerNodeResponse::node_id is empty");
314
315 let meta_client = Self {
316 worker_id,
317 worker_type,
318 host_addr: addr.clone(),
319 inner: grpc_meta_client,
320 meta_config: meta_config.to_owned(),
321 cluster_id: add_worker_resp.cluster_id,
322 shutting_down: Arc::new(false.into()),
323 };
324
325 static REPORT_PANIC: std::sync::Once = std::sync::Once::new();
326 REPORT_PANIC.call_once(|| {
327 let meta_client_clone = meta_client.clone();
328 std::panic::update_hook(move |default_hook, info| {
329 meta_client_clone.try_add_panic_event_blocking(info, None);
331 default_hook(info);
332 });
333 });
334
335 Ok((meta_client, system_params_resp.params.unwrap().into()))
336 }
337
338 pub async fn activate(&self, addr: &HostAddr) -> Result<()> {
340 let request = ActivateWorkerNodeRequest {
341 host: Some(addr.to_protobuf()),
342 node_id: self.worker_id,
343 };
344 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
345 Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64),
346 true,
347 );
348 tokio_retry::Retry::spawn(retry_strategy, || async {
349 let request = request.clone();
350 self.inner.activate_worker_node(request).await
351 })
352 .await?;
353
354 Ok(())
355 }
356
357 pub async fn send_heartbeat(&self, node_id: u32) -> Result<()> {
359 let request = HeartbeatRequest { node_id };
360 let resp = self.inner.heartbeat(request).await?;
361 if let Some(status) = resp.status {
362 if status.code() == risingwave_pb::common::status::Code::UnknownWorker {
363 if !self.shutting_down.load(Relaxed) {
366 tracing::error!(message = status.message, "worker expired");
367 std::process::exit(1);
368 }
369 }
370 }
371 Ok(())
372 }
373
374 pub async fn create_database(&self, db: PbDatabase) -> Result<WaitVersion> {
375 let request = CreateDatabaseRequest { db: Some(db) };
376 let resp = self.inner.create_database(request).await?;
377 Ok(resp
379 .version
380 .ok_or_else(|| anyhow!("wait version not set"))?)
381 }
382
383 pub async fn create_schema(&self, schema: PbSchema) -> Result<WaitVersion> {
384 let request = CreateSchemaRequest {
385 schema: Some(schema),
386 };
387 let resp = self.inner.create_schema(request).await?;
388 Ok(resp
390 .version
391 .ok_or_else(|| anyhow!("wait version not set"))?)
392 }
393
394 pub async fn create_materialized_view(
395 &self,
396 table: PbTable,
397 graph: StreamFragmentGraph,
398 dependencies: HashSet<ObjectId>,
399 specific_resource_group: Option<String>,
400 ) -> Result<WaitVersion> {
401 let request = CreateMaterializedViewRequest {
402 materialized_view: Some(table),
403 fragment_graph: Some(graph),
404 backfill: PbBackfillType::Regular as _,
405 dependencies: dependencies.into_iter().collect(),
406 specific_resource_group,
407 };
408 let resp = self.inner.create_materialized_view(request).await?;
409 Ok(resp
411 .version
412 .ok_or_else(|| anyhow!("wait version not set"))?)
413 }
414
415 pub async fn drop_materialized_view(
416 &self,
417 table_id: TableId,
418 cascade: bool,
419 ) -> Result<WaitVersion> {
420 let request = DropMaterializedViewRequest {
421 table_id: table_id.table_id(),
422 cascade,
423 };
424
425 let resp = self.inner.drop_materialized_view(request).await?;
426 Ok(resp
427 .version
428 .ok_or_else(|| anyhow!("wait version not set"))?)
429 }
430
431 pub async fn create_source(
432 &self,
433 source: PbSource,
434 graph: Option<StreamFragmentGraph>,
435 ) -> Result<WaitVersion> {
436 let request = CreateSourceRequest {
437 source: Some(source),
438 fragment_graph: graph,
439 };
440
441 let resp = self.inner.create_source(request).await?;
442 Ok(resp
443 .version
444 .ok_or_else(|| anyhow!("wait version not set"))?)
445 }
446
447 pub async fn create_sink(
448 &self,
449 sink: PbSink,
450 graph: StreamFragmentGraph,
451 affected_table_change: Option<ReplaceJobPlan>,
452 dependencies: HashSet<ObjectId>,
453 ) -> Result<WaitVersion> {
454 let request = CreateSinkRequest {
455 sink: Some(sink),
456 fragment_graph: Some(graph),
457 affected_table_change,
458 dependencies: dependencies.into_iter().collect(),
459 };
460
461 let resp = self.inner.create_sink(request).await?;
462 Ok(resp
463 .version
464 .ok_or_else(|| anyhow!("wait version not set"))?)
465 }
466
467 pub async fn create_subscription(&self, subscription: PbSubscription) -> Result<WaitVersion> {
468 let request = CreateSubscriptionRequest {
469 subscription: Some(subscription),
470 };
471
472 let resp = self.inner.create_subscription(request).await?;
473 Ok(resp
474 .version
475 .ok_or_else(|| anyhow!("wait version not set"))?)
476 }
477
478 pub async fn create_function(&self, function: PbFunction) -> Result<WaitVersion> {
479 let request = CreateFunctionRequest {
480 function: Some(function),
481 };
482 let resp = self.inner.create_function(request).await?;
483 Ok(resp
484 .version
485 .ok_or_else(|| anyhow!("wait version not set"))?)
486 }
487
488 pub async fn create_table(
489 &self,
490 source: Option<PbSource>,
491 table: PbTable,
492 graph: StreamFragmentGraph,
493 job_type: PbTableJobType,
494 ) -> Result<WaitVersion> {
495 let request = CreateTableRequest {
496 materialized_view: Some(table),
497 fragment_graph: Some(graph),
498 source,
499 job_type: job_type as _,
500 };
501 let resp = self.inner.create_table(request).await?;
502 Ok(resp
504 .version
505 .ok_or_else(|| anyhow!("wait version not set"))?)
506 }
507
508 pub async fn comment_on(&self, comment: PbComment) -> Result<WaitVersion> {
509 let request = CommentOnRequest {
510 comment: Some(comment),
511 };
512 let resp = self.inner.comment_on(request).await?;
513 Ok(resp
514 .version
515 .ok_or_else(|| anyhow!("wait version not set"))?)
516 }
517
518 pub async fn alter_name(
519 &self,
520 object: alter_name_request::Object,
521 name: &str,
522 ) -> Result<WaitVersion> {
523 let request = AlterNameRequest {
524 object: Some(object),
525 new_name: name.to_owned(),
526 };
527 let resp = self.inner.alter_name(request).await?;
528 Ok(resp
529 .version
530 .ok_or_else(|| anyhow!("wait version not set"))?)
531 }
532
533 pub async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<WaitVersion> {
534 let request = AlterOwnerRequest {
535 object: Some(object),
536 owner_id,
537 };
538 let resp = self.inner.alter_owner(request).await?;
539 Ok(resp
540 .version
541 .ok_or_else(|| anyhow!("wait version not set"))?)
542 }
543
544 pub async fn alter_set_schema(
545 &self,
546 object: alter_set_schema_request::Object,
547 new_schema_id: u32,
548 ) -> Result<WaitVersion> {
549 let request = AlterSetSchemaRequest {
550 new_schema_id,
551 object: Some(object),
552 };
553 let resp = self.inner.alter_set_schema(request).await?;
554 Ok(resp
555 .version
556 .ok_or_else(|| anyhow!("wait version not set"))?)
557 }
558
559 pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
560 let request = AlterSourceRequest {
561 source: Some(source),
562 };
563 let resp = self.inner.alter_source(request).await?;
564 Ok(resp
565 .version
566 .ok_or_else(|| anyhow!("wait version not set"))?)
567 }
568
569 pub async fn alter_parallelism(
570 &self,
571 table_id: u32,
572 parallelism: PbTableParallelism,
573 deferred: bool,
574 ) -> Result<()> {
575 let request = AlterParallelismRequest {
576 table_id,
577 parallelism: Some(parallelism),
578 deferred,
579 };
580
581 self.inner.alter_parallelism(request).await?;
582 Ok(())
583 }
584
585 pub async fn alter_resource_group(
586 &self,
587 table_id: u32,
588 resource_group: Option<String>,
589 deferred: bool,
590 ) -> Result<()> {
591 let request = AlterResourceGroupRequest {
592 table_id,
593 resource_group,
594 deferred,
595 };
596
597 self.inner.alter_resource_group(request).await?;
598 Ok(())
599 }
600
601 pub async fn alter_swap_rename(
602 &self,
603 object: alter_swap_rename_request::Object,
604 ) -> Result<WaitVersion> {
605 let request = AlterSwapRenameRequest {
606 object: Some(object),
607 };
608 let resp = self.inner.alter_swap_rename(request).await?;
609 Ok(resp
610 .version
611 .ok_or_else(|| anyhow!("wait version not set"))?)
612 }
613
614 pub async fn alter_secret(
615 &self,
616 secret_id: u32,
617 secret_name: String,
618 database_id: u32,
619 schema_id: u32,
620 owner_id: u32,
621 value: Vec<u8>,
622 ) -> Result<WaitVersion> {
623 let request = AlterSecretRequest {
624 secret_id,
625 name: secret_name,
626 database_id,
627 schema_id,
628 owner_id,
629 value,
630 };
631 let resp = self.inner.alter_secret(request).await?;
632 Ok(resp
633 .version
634 .ok_or_else(|| anyhow!("wait version not set"))?)
635 }
636
637 pub async fn replace_job(
638 &self,
639 graph: StreamFragmentGraph,
640 table_col_index_mapping: ColIndexMapping,
641 replace_job: ReplaceJob,
642 ) -> Result<WaitVersion> {
643 let request = ReplaceJobPlanRequest {
644 plan: Some(ReplaceJobPlan {
645 fragment_graph: Some(graph),
646 table_col_index_mapping: Some(table_col_index_mapping.to_protobuf()),
647 replace_job: Some(replace_job),
648 }),
649 };
650 let resp = self.inner.replace_job_plan(request).await?;
651 Ok(resp
653 .version
654 .ok_or_else(|| anyhow!("wait version not set"))?)
655 }
656
657 pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> {
658 let request = AutoSchemaChangeRequest {
659 schema_change: Some(schema_change),
660 };
661 let _ = self.inner.auto_schema_change(request).await?;
662 Ok(())
663 }
664
665 pub async fn create_view(&self, view: PbView) -> Result<WaitVersion> {
666 let request = CreateViewRequest { view: Some(view) };
667 let resp = self.inner.create_view(request).await?;
668 Ok(resp
670 .version
671 .ok_or_else(|| anyhow!("wait version not set"))?)
672 }
673
674 pub async fn create_index(
675 &self,
676 index: PbIndex,
677 table: PbTable,
678 graph: StreamFragmentGraph,
679 ) -> Result<WaitVersion> {
680 let request = CreateIndexRequest {
681 index: Some(index),
682 index_table: Some(table),
683 fragment_graph: Some(graph),
684 };
685 let resp = self.inner.create_index(request).await?;
686 Ok(resp
688 .version
689 .ok_or_else(|| anyhow!("wait version not set"))?)
690 }
691
692 pub async fn drop_table(
693 &self,
694 source_id: Option<u32>,
695 table_id: TableId,
696 cascade: bool,
697 ) -> Result<WaitVersion> {
698 let request = DropTableRequest {
699 source_id: source_id.map(SourceId::Id),
700 table_id: table_id.table_id(),
701 cascade,
702 };
703
704 let resp = self.inner.drop_table(request).await?;
705 Ok(resp
706 .version
707 .ok_or_else(|| anyhow!("wait version not set"))?)
708 }
709
710 pub async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<WaitVersion> {
711 let request = DropViewRequest { view_id, cascade };
712 let resp = self.inner.drop_view(request).await?;
713 Ok(resp
714 .version
715 .ok_or_else(|| anyhow!("wait version not set"))?)
716 }
717
718 pub async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<WaitVersion> {
719 let request = DropSourceRequest { source_id, cascade };
720 let resp = self.inner.drop_source(request).await?;
721 Ok(resp
722 .version
723 .ok_or_else(|| anyhow!("wait version not set"))?)
724 }
725
726 pub async fn drop_sink(
727 &self,
728 sink_id: u32,
729 cascade: bool,
730 affected_table_change: Option<ReplaceJobPlan>,
731 ) -> Result<WaitVersion> {
732 let request = DropSinkRequest {
733 sink_id,
734 cascade,
735 affected_table_change,
736 };
737 let resp = self.inner.drop_sink(request).await?;
738 Ok(resp
739 .version
740 .ok_or_else(|| anyhow!("wait version not set"))?)
741 }
742
743 pub async fn drop_subscription(
744 &self,
745 subscription_id: u32,
746 cascade: bool,
747 ) -> Result<WaitVersion> {
748 let request = DropSubscriptionRequest {
749 subscription_id,
750 cascade,
751 };
752 let resp = self.inner.drop_subscription(request).await?;
753 Ok(resp
754 .version
755 .ok_or_else(|| anyhow!("wait version not set"))?)
756 }
757
758 pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<WaitVersion> {
759 let request = DropIndexRequest {
760 index_id: index_id.index_id,
761 cascade,
762 };
763 let resp = self.inner.drop_index(request).await?;
764 Ok(resp
765 .version
766 .ok_or_else(|| anyhow!("wait version not set"))?)
767 }
768
769 pub async fn drop_function(&self, function_id: FunctionId) -> Result<WaitVersion> {
770 let request = DropFunctionRequest {
771 function_id: function_id.0,
772 };
773 let resp = self.inner.drop_function(request).await?;
774 Ok(resp
775 .version
776 .ok_or_else(|| anyhow!("wait version not set"))?)
777 }
778
779 pub async fn drop_database(&self, database_id: DatabaseId) -> Result<WaitVersion> {
780 let request = DropDatabaseRequest { database_id };
781 let resp = self.inner.drop_database(request).await?;
782 Ok(resp
783 .version
784 .ok_or_else(|| anyhow!("wait version not set"))?)
785 }
786
787 pub async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<WaitVersion> {
788 let request = DropSchemaRequest { schema_id, cascade };
789 let resp = self.inner.drop_schema(request).await?;
790 Ok(resp
791 .version
792 .ok_or_else(|| anyhow!("wait version not set"))?)
793 }
794
795 pub async fn create_user(&self, user: UserInfo) -> Result<u64> {
797 let request = CreateUserRequest { user: Some(user) };
798 let resp = self.inner.create_user(request).await?;
799 Ok(resp.version)
800 }
801
802 pub async fn drop_user(&self, user_id: u32) -> Result<u64> {
803 let request = DropUserRequest { user_id };
804 let resp = self.inner.drop_user(request).await?;
805 Ok(resp.version)
806 }
807
808 pub async fn update_user(
809 &self,
810 user: UserInfo,
811 update_fields: Vec<UpdateField>,
812 ) -> Result<u64> {
813 let request = UpdateUserRequest {
814 user: Some(user),
815 update_fields: update_fields
816 .into_iter()
817 .map(|field| field as i32)
818 .collect::<Vec<_>>(),
819 };
820 let resp = self.inner.update_user(request).await?;
821 Ok(resp.version)
822 }
823
824 pub async fn grant_privilege(
825 &self,
826 user_ids: Vec<u32>,
827 privileges: Vec<GrantPrivilege>,
828 with_grant_option: bool,
829 granted_by: u32,
830 ) -> Result<u64> {
831 let request = GrantPrivilegeRequest {
832 user_ids,
833 privileges,
834 with_grant_option,
835 granted_by,
836 };
837 let resp = self.inner.grant_privilege(request).await?;
838 Ok(resp.version)
839 }
840
841 pub async fn revoke_privilege(
842 &self,
843 user_ids: Vec<u32>,
844 privileges: Vec<GrantPrivilege>,
845 granted_by: u32,
846 revoke_by: u32,
847 revoke_grant_option: bool,
848 cascade: bool,
849 ) -> Result<u64> {
850 let request = RevokePrivilegeRequest {
851 user_ids,
852 privileges,
853 granted_by,
854 revoke_by,
855 revoke_grant_option,
856 cascade,
857 };
858 let resp = self.inner.revoke_privilege(request).await?;
859 Ok(resp.version)
860 }
861
862 pub async fn unregister(&self) -> Result<()> {
864 let request = DeleteWorkerNodeRequest {
865 host: Some(self.host_addr.to_protobuf()),
866 };
867 self.inner.delete_worker_node(request).await?;
868 self.shutting_down.store(true, Relaxed);
869 Ok(())
870 }
871
872 pub async fn try_unregister(&self) {
874 match self.unregister().await {
875 Ok(_) => {
876 tracing::info!(
877 worker_id = self.worker_id(),
878 "successfully unregistered from meta service",
879 )
880 }
881 Err(e) => {
882 tracing::warn!(
883 error = %e.as_report(),
884 worker_id = self.worker_id(),
885 "failed to unregister from meta service",
886 );
887 }
888 }
889 }
890
891 pub async fn update_schedulability(
892 &self,
893 worker_ids: &[u32],
894 schedulability: Schedulability,
895 ) -> Result<UpdateWorkerNodeSchedulabilityResponse> {
896 let request = UpdateWorkerNodeSchedulabilityRequest {
897 worker_ids: worker_ids.to_vec(),
898 schedulability: schedulability.into(),
899 };
900 let resp = self
901 .inner
902 .update_worker_node_schedulability(request)
903 .await?;
904 Ok(resp)
905 }
906
907 pub async fn list_worker_nodes(
908 &self,
909 worker_type: Option<WorkerType>,
910 ) -> Result<Vec<WorkerNode>> {
911 let request = ListAllNodesRequest {
912 worker_type: worker_type.map(Into::into),
913 include_starting_nodes: true,
914 };
915 let resp = self.inner.list_all_nodes(request).await?;
916 Ok(resp.nodes)
917 }
918
919 pub fn start_heartbeat_loop(
921 meta_client: MetaClient,
922 min_interval: Duration,
923 ) -> (JoinHandle<()>, Sender<()>) {
924 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
925 let join_handle = tokio::spawn(async move {
926 let mut min_interval_ticker = tokio::time::interval(min_interval);
927 loop {
928 tokio::select! {
929 biased;
930 _ = &mut shutdown_rx => {
932 tracing::info!("Heartbeat loop is stopped");
933 return;
934 }
935 _ = min_interval_ticker.tick() => {},
937 }
938 tracing::debug!(target: "events::meta::client_heartbeat", "heartbeat");
939 match tokio::time::timeout(
940 min_interval * 3,
942 meta_client.send_heartbeat(meta_client.worker_id()),
943 )
944 .await
945 {
946 Ok(Ok(_)) => {}
947 Ok(Err(err)) => {
948 tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat");
949 }
950 Err(_) => {
951 tracing::warn!("Failed to send_heartbeat: timeout");
952 }
953 }
954 }
955 });
956 (join_handle, shutdown_tx)
957 }
958
959 pub async fn risectl_list_state_tables(&self) -> Result<Vec<PbTable>> {
960 let request = RisectlListStateTablesRequest {};
961 let resp = self.inner.risectl_list_state_tables(request).await?;
962 Ok(resp.tables)
963 }
964
965 pub async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
966 let request = FlushRequest { database_id };
967 let resp = self.inner.flush(request).await?;
968 Ok(HummockVersionId::new(resp.hummock_version_id))
969 }
970
971 pub async fn wait(&self) -> Result<()> {
972 let request = WaitRequest {};
973 self.inner.wait(request).await?;
974 Ok(())
975 }
976
977 pub async fn recover(&self) -> Result<()> {
978 let request = RecoverRequest {};
979 self.inner.recover(request).await?;
980 Ok(())
981 }
982
983 pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
984 let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
985 let resp = self.inner.cancel_creating_jobs(request).await?;
986 Ok(resp.canceled_jobs)
987 }
988
989 pub async fn list_table_fragments(
990 &self,
991 table_ids: &[u32],
992 ) -> Result<HashMap<u32, TableFragmentInfo>> {
993 let request = ListTableFragmentsRequest {
994 table_ids: table_ids.to_vec(),
995 };
996 let resp = self.inner.list_table_fragments(request).await?;
997 Ok(resp.table_fragments)
998 }
999
1000 pub async fn list_streaming_job_states(&self) -> Result<Vec<StreamingJobState>> {
1001 let resp = self
1002 .inner
1003 .list_streaming_job_states(ListStreamingJobStatesRequest {})
1004 .await?;
1005 Ok(resp.states)
1006 }
1007
1008 pub async fn list_fragment_distributions(&self) -> Result<Vec<FragmentDistribution>> {
1009 let resp = self
1010 .inner
1011 .list_fragment_distribution(ListFragmentDistributionRequest {})
1012 .await?;
1013 Ok(resp.distributions)
1014 }
1015
1016 pub async fn list_actor_states(&self) -> Result<Vec<ActorState>> {
1017 let resp = self
1018 .inner
1019 .list_actor_states(ListActorStatesRequest {})
1020 .await?;
1021 Ok(resp.states)
1022 }
1023
1024 pub async fn list_actor_splits(&self) -> Result<Vec<ActorSplit>> {
1025 let resp = self
1026 .inner
1027 .list_actor_splits(ListActorSplitsRequest {})
1028 .await?;
1029
1030 Ok(resp.actor_splits)
1031 }
1032
1033 pub async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>> {
1034 let resp = self
1035 .inner
1036 .list_object_dependencies(ListObjectDependenciesRequest {})
1037 .await?;
1038 Ok(resp.dependencies)
1039 }
1040
1041 pub async fn pause(&self) -> Result<PauseResponse> {
1042 let request = PauseRequest {};
1043 let resp = self.inner.pause(request).await?;
1044 Ok(resp)
1045 }
1046
1047 pub async fn resume(&self) -> Result<ResumeResponse> {
1048 let request = ResumeRequest {};
1049 let resp = self.inner.resume(request).await?;
1050 Ok(resp)
1051 }
1052
1053 pub async fn apply_throttle(
1054 &self,
1055 kind: PbThrottleTarget,
1056 id: u32,
1057 rate: Option<u32>,
1058 ) -> Result<ApplyThrottleResponse> {
1059 let request = ApplyThrottleRequest {
1060 kind: kind as i32,
1061 id,
1062 rate,
1063 };
1064 let resp = self.inner.apply_throttle(request).await?;
1065 Ok(resp)
1066 }
1067
1068 pub async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
1069 let resp = self
1070 .inner
1071 .get_cluster_recovery_status(GetClusterRecoveryStatusRequest {})
1072 .await?;
1073 Ok(resp.get_status().unwrap())
1074 }
1075
1076 pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
1077 let request = GetClusterInfoRequest {};
1078 let resp = self.inner.get_cluster_info(request).await?;
1079 Ok(resp)
1080 }
1081
1082 pub async fn reschedule(
1083 &self,
1084 worker_reschedules: HashMap<u32, PbWorkerReschedule>,
1085 revision: u64,
1086 resolve_no_shuffle_upstream: bool,
1087 ) -> Result<(bool, u64)> {
1088 let request = RescheduleRequest {
1089 revision,
1090 resolve_no_shuffle_upstream,
1091 worker_reschedules,
1092 };
1093 let resp = self.inner.reschedule(request).await?;
1094 Ok((resp.success, resp.revision))
1095 }
1096
1097 pub async fn risectl_get_pinned_versions_summary(
1098 &self,
1099 ) -> Result<RiseCtlGetPinnedVersionsSummaryResponse> {
1100 let request = RiseCtlGetPinnedVersionsSummaryRequest {};
1101 self.inner
1102 .rise_ctl_get_pinned_versions_summary(request)
1103 .await
1104 }
1105
1106 pub async fn risectl_get_checkpoint_hummock_version(
1107 &self,
1108 ) -> Result<RiseCtlGetCheckpointVersionResponse> {
1109 let request = RiseCtlGetCheckpointVersionRequest {};
1110 self.inner.rise_ctl_get_checkpoint_version(request).await
1111 }
1112
1113 pub async fn risectl_pause_hummock_version_checkpoint(
1114 &self,
1115 ) -> Result<RiseCtlPauseVersionCheckpointResponse> {
1116 let request = RiseCtlPauseVersionCheckpointRequest {};
1117 self.inner.rise_ctl_pause_version_checkpoint(request).await
1118 }
1119
1120 pub async fn risectl_resume_hummock_version_checkpoint(
1121 &self,
1122 ) -> Result<RiseCtlResumeVersionCheckpointResponse> {
1123 let request = RiseCtlResumeVersionCheckpointRequest {};
1124 self.inner.rise_ctl_resume_version_checkpoint(request).await
1125 }
1126
1127 pub async fn init_metadata_for_replay(
1128 &self,
1129 tables: Vec<PbTable>,
1130 compaction_groups: Vec<CompactionGroupInfo>,
1131 ) -> Result<()> {
1132 let req = InitMetadataForReplayRequest {
1133 tables,
1134 compaction_groups,
1135 };
1136 let _resp = self.inner.init_metadata_for_replay(req).await?;
1137 Ok(())
1138 }
1139
1140 pub async fn replay_version_delta(
1141 &self,
1142 version_delta: HummockVersionDelta,
1143 ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
1144 let req = ReplayVersionDeltaRequest {
1145 version_delta: Some(version_delta.into()),
1146 };
1147 let resp = self.inner.replay_version_delta(req).await?;
1148 Ok((
1149 HummockVersion::from_rpc_protobuf(&resp.version.unwrap()),
1150 resp.modified_compaction_groups,
1151 ))
1152 }
1153
1154 pub async fn list_version_deltas(
1155 &self,
1156 start_id: HummockVersionId,
1157 num_limit: u32,
1158 committed_epoch_limit: HummockEpoch,
1159 ) -> Result<Vec<HummockVersionDelta>> {
1160 let req = ListVersionDeltasRequest {
1161 start_id: start_id.to_u64(),
1162 num_limit,
1163 committed_epoch_limit,
1164 };
1165 Ok(self
1166 .inner
1167 .list_version_deltas(req)
1168 .await?
1169 .version_deltas
1170 .unwrap()
1171 .version_deltas
1172 .iter()
1173 .map(HummockVersionDelta::from_rpc_protobuf)
1174 .collect())
1175 }
1176
1177 pub async fn trigger_compaction_deterministic(
1178 &self,
1179 version_id: HummockVersionId,
1180 compaction_groups: Vec<CompactionGroupId>,
1181 ) -> Result<()> {
1182 let req = TriggerCompactionDeterministicRequest {
1183 version_id: version_id.to_u64(),
1184 compaction_groups,
1185 };
1186 self.inner.trigger_compaction_deterministic(req).await?;
1187 Ok(())
1188 }
1189
1190 pub async fn disable_commit_epoch(&self) -> Result<HummockVersion> {
1191 let req = DisableCommitEpochRequest {};
1192 Ok(HummockVersion::from_rpc_protobuf(
1193 &self
1194 .inner
1195 .disable_commit_epoch(req)
1196 .await?
1197 .current_version
1198 .unwrap(),
1199 ))
1200 }
1201
1202 pub async fn get_assigned_compact_task_num(&self) -> Result<usize> {
1203 let req = GetAssignedCompactTaskNumRequest {};
1204 let resp = self.inner.get_assigned_compact_task_num(req).await?;
1205 Ok(resp.num_tasks as usize)
1206 }
1207
1208 pub async fn risectl_list_compaction_group(&self) -> Result<Vec<CompactionGroupInfo>> {
1209 let req = RiseCtlListCompactionGroupRequest {};
1210 let resp = self.inner.rise_ctl_list_compaction_group(req).await?;
1211 Ok(resp.compaction_groups)
1212 }
1213
1214 pub async fn risectl_update_compaction_config(
1215 &self,
1216 compaction_groups: &[CompactionGroupId],
1217 configs: &[MutableConfig],
1218 ) -> Result<()> {
1219 let req = RiseCtlUpdateCompactionConfigRequest {
1220 compaction_group_ids: compaction_groups.to_vec(),
1221 configs: configs
1222 .iter()
1223 .map(
1224 |c| rise_ctl_update_compaction_config_request::MutableConfig {
1225 mutable_config: Some(c.clone()),
1226 },
1227 )
1228 .collect(),
1229 };
1230 let _resp = self.inner.rise_ctl_update_compaction_config(req).await?;
1231 Ok(())
1232 }
1233
1234 pub async fn backup_meta(&self, remarks: Option<String>) -> Result<u64> {
1235 let req = BackupMetaRequest { remarks };
1236 let resp = self.inner.backup_meta(req).await?;
1237 Ok(resp.job_id)
1238 }
1239
1240 pub async fn get_backup_job_status(&self, job_id: u64) -> Result<(BackupJobStatus, String)> {
1241 let req = GetBackupJobStatusRequest { job_id };
1242 let resp = self.inner.get_backup_job_status(req).await?;
1243 Ok((resp.job_status(), resp.message))
1244 }
1245
1246 pub async fn delete_meta_snapshot(&self, snapshot_ids: &[u64]) -> Result<()> {
1247 let req = DeleteMetaSnapshotRequest {
1248 snapshot_ids: snapshot_ids.to_vec(),
1249 };
1250 let _resp = self.inner.delete_meta_snapshot(req).await?;
1251 Ok(())
1252 }
1253
1254 pub async fn get_meta_snapshot_manifest(&self) -> Result<MetaSnapshotManifest> {
1255 let req = GetMetaSnapshotManifestRequest {};
1256 let resp = self.inner.get_meta_snapshot_manifest(req).await?;
1257 Ok(resp.manifest.expect("should exist"))
1258 }
1259
1260 pub async fn get_telemetry_info(&self) -> Result<TelemetryInfoResponse> {
1261 let req = GetTelemetryInfoRequest {};
1262 let resp = self.inner.get_telemetry_info(req).await?;
1263 Ok(resp)
1264 }
1265
1266 pub async fn get_system_params(&self) -> Result<SystemParamsReader> {
1267 let req = GetSystemParamsRequest {};
1268 let resp = self.inner.get_system_params(req).await?;
1269 Ok(resp.params.unwrap().into())
1270 }
1271
1272 pub async fn get_meta_store_endpoint(&self) -> Result<String> {
1273 let req = GetMetaStoreInfoRequest {};
1274 let resp = self.inner.get_meta_store_info(req).await?;
1275 Ok(resp.meta_store_endpoint)
1276 }
1277
1278 pub async fn set_system_param(
1279 &self,
1280 param: String,
1281 value: Option<String>,
1282 ) -> Result<Option<SystemParamsReader>> {
1283 let req = SetSystemParamRequest { param, value };
1284 let resp = self.inner.set_system_param(req).await?;
1285 Ok(resp.params.map(SystemParamsReader::from))
1286 }
1287
1288 pub async fn get_session_params(&self) -> Result<String> {
1289 let req = GetSessionParamsRequest {};
1290 let resp = self.inner.get_session_params(req).await?;
1291 Ok(resp.params)
1292 }
1293
1294 pub async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String> {
1295 let req = SetSessionParamRequest { param, value };
1296 let resp = self.inner.set_session_param(req).await?;
1297 Ok(resp.param)
1298 }
1299
1300 pub async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
1301 let req = GetDdlProgressRequest {};
1302 let resp = self.inner.get_ddl_progress(req).await?;
1303 Ok(resp.ddl_progress)
1304 }
1305
1306 pub async fn split_compaction_group(
1307 &self,
1308 group_id: CompactionGroupId,
1309 table_ids_to_new_group: &[StateTableId],
1310 partition_vnode_count: u32,
1311 ) -> Result<CompactionGroupId> {
1312 let req = SplitCompactionGroupRequest {
1313 group_id,
1314 table_ids: table_ids_to_new_group.to_vec(),
1315 partition_vnode_count,
1316 };
1317 let resp = self.inner.split_compaction_group(req).await?;
1318 Ok(resp.new_group_id)
1319 }
1320
1321 pub async fn get_tables(
1322 &self,
1323 table_ids: &[u32],
1324 include_dropped_tables: bool,
1325 ) -> Result<HashMap<u32, Table>> {
1326 let req = GetTablesRequest {
1327 table_ids: table_ids.to_vec(),
1328 include_dropped_tables,
1329 };
1330 let resp = self.inner.get_tables(req).await?;
1331 Ok(resp.tables)
1332 }
1333
1334 pub async fn list_serving_vnode_mappings(
1335 &self,
1336 ) -> Result<HashMap<u32, (u32, WorkerSlotMapping)>> {
1337 let req = GetServingVnodeMappingsRequest {};
1338 let resp = self.inner.get_serving_vnode_mappings(req).await?;
1339 let mappings = resp
1340 .worker_slot_mappings
1341 .into_iter()
1342 .map(|p| {
1343 (
1344 p.fragment_id,
1345 (
1346 resp.fragment_to_table
1347 .get(&p.fragment_id)
1348 .cloned()
1349 .unwrap_or(0),
1350 WorkerSlotMapping::from_protobuf(p.mapping.as_ref().unwrap()),
1351 ),
1352 )
1353 })
1354 .collect();
1355 Ok(mappings)
1356 }
1357
1358 pub async fn risectl_list_compaction_status(
1359 &self,
1360 ) -> Result<(
1361 Vec<CompactStatus>,
1362 Vec<CompactTaskAssignment>,
1363 Vec<CompactTaskProgress>,
1364 )> {
1365 let req = RiseCtlListCompactionStatusRequest {};
1366 let resp = self.inner.rise_ctl_list_compaction_status(req).await?;
1367 Ok((
1368 resp.compaction_statuses,
1369 resp.task_assignment,
1370 resp.task_progress,
1371 ))
1372 }
1373
1374 pub async fn get_compaction_score(
1375 &self,
1376 compaction_group_id: CompactionGroupId,
1377 ) -> Result<Vec<PickerInfo>> {
1378 let req = GetCompactionScoreRequest {
1379 compaction_group_id,
1380 };
1381 let resp = self.inner.get_compaction_score(req).await?;
1382 Ok(resp.scores)
1383 }
1384
1385 pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
1386 let req = RiseCtlRebuildTableStatsRequest {};
1387 let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
1388 Ok(())
1389 }
1390
1391 pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
1392 let req = ListBranchedObjectRequest {};
1393 let resp = self.inner.list_branched_object(req).await?;
1394 Ok(resp.branched_objects)
1395 }
1396
1397 pub async fn list_active_write_limit(&self) -> Result<HashMap<u64, WriteLimit>> {
1398 let req = ListActiveWriteLimitRequest {};
1399 let resp = self.inner.list_active_write_limit(req).await?;
1400 Ok(resp.write_limits)
1401 }
1402
1403 pub async fn list_hummock_meta_config(&self) -> Result<HashMap<String, String>> {
1404 let req = ListHummockMetaConfigRequest {};
1405 let resp = self.inner.list_hummock_meta_config(req).await?;
1406 Ok(resp.configs)
1407 }
1408
1409 pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
1410 let _resp = self
1411 .inner
1412 .delete_worker_node(DeleteWorkerNodeRequest { host: Some(worker) })
1413 .await?;
1414
1415 Ok(())
1416 }
1417
1418 pub async fn rw_cloud_validate_source(
1419 &self,
1420 source_type: SourceType,
1421 source_config: HashMap<String, String>,
1422 ) -> Result<RwCloudValidateSourceResponse> {
1423 let req = RwCloudValidateSourceRequest {
1424 source_type: source_type.into(),
1425 source_config,
1426 };
1427 let resp = self.inner.rw_cloud_validate_source(req).await?;
1428 Ok(resp)
1429 }
1430
1431 pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClient {
1432 self.inner.core.read().await.sink_coordinate_client.clone()
1433 }
1434
1435 pub async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
1436 let req = ListCompactTaskAssignmentRequest {};
1437 let resp = self.inner.list_compact_task_assignment(req).await?;
1438 Ok(resp.task_assignment)
1439 }
1440
1441 pub async fn list_event_log(&self) -> Result<Vec<EventLog>> {
1442 let req = ListEventLogRequest::default();
1443 let resp = self.inner.list_event_log(req).await?;
1444 Ok(resp.event_logs)
1445 }
1446
1447 pub async fn list_compact_task_progress(&self) -> Result<Vec<CompactTaskProgress>> {
1448 let req = ListCompactTaskProgressRequest {};
1449 let resp = self.inner.list_compact_task_progress(req).await?;
1450 Ok(resp.task_progress)
1451 }
1452
1453 #[cfg(madsim)]
1454 pub fn try_add_panic_event_blocking(
1455 &self,
1456 panic_info: impl Display,
1457 timeout_millis: Option<u64>,
1458 ) {
1459 }
1460
1461 #[cfg(not(madsim))]
1463 pub fn try_add_panic_event_blocking(
1464 &self,
1465 panic_info: impl Display,
1466 timeout_millis: Option<u64>,
1467 ) {
1468 let event = event_log::EventWorkerNodePanic {
1469 worker_id: self.worker_id,
1470 worker_type: self.worker_type.into(),
1471 host_addr: Some(self.host_addr.to_protobuf()),
1472 panic_info: format!("{panic_info}"),
1473 };
1474 let grpc_meta_client = self.inner.clone();
1475 let _ = thread::spawn(move || {
1476 let rt = tokio::runtime::Builder::new_current_thread()
1477 .enable_all()
1478 .build()
1479 .unwrap();
1480 let req = AddEventLogRequest {
1481 event: Some(add_event_log_request::Event::WorkerNodePanic(event)),
1482 };
1483 rt.block_on(async {
1484 let _ = tokio::time::timeout(
1485 Duration::from_millis(timeout_millis.unwrap_or(1000)),
1486 grpc_meta_client.add_event_log(req),
1487 )
1488 .await;
1489 });
1490 })
1491 .join();
1492 }
1493
1494 pub async fn add_sink_fail_evet(
1495 &self,
1496 sink_id: u32,
1497 sink_name: String,
1498 connector: String,
1499 error: String,
1500 ) -> Result<()> {
1501 let event = event_log::EventSinkFail {
1502 sink_id,
1503 sink_name,
1504 connector,
1505 error,
1506 };
1507 let req = AddEventLogRequest {
1508 event: Some(add_event_log_request::Event::SinkFail(event)),
1509 };
1510 self.inner.add_event_log(req).await?;
1511 Ok(())
1512 }
1513
1514 pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1515 let req = CancelCompactTaskRequest {
1516 task_id,
1517 task_status: task_status as _,
1518 };
1519 let resp = self.inner.cancel_compact_task(req).await?;
1520 Ok(resp.ret)
1521 }
1522
1523 pub async fn get_version_by_epoch(
1524 &self,
1525 epoch: HummockEpoch,
1526 table_id: u32,
1527 ) -> Result<PbHummockVersion> {
1528 let req = GetVersionByEpochRequest { epoch, table_id };
1529 let resp = self.inner.get_version_by_epoch(req).await?;
1530 Ok(resp.version.unwrap())
1531 }
1532
1533 pub async fn get_cluster_limits(
1534 &self,
1535 ) -> Result<Vec<risingwave_common::util::cluster_limit::ClusterLimit>> {
1536 let req = GetClusterLimitsRequest {};
1537 let resp = self.inner.get_cluster_limits(req).await?;
1538 Ok(resp.active_limits.into_iter().map(|l| l.into()).collect())
1539 }
1540
1541 pub async fn merge_compaction_group(
1542 &self,
1543 left_group_id: CompactionGroupId,
1544 right_group_id: CompactionGroupId,
1545 ) -> Result<()> {
1546 let req = MergeCompactionGroupRequest {
1547 left_group_id,
1548 right_group_id,
1549 };
1550 self.inner.merge_compaction_group(req).await?;
1551 Ok(())
1552 }
1553
1554 pub async fn list_rate_limits(&self) -> Result<Vec<RateLimitInfo>> {
1556 let request = ListRateLimitsRequest {};
1557 let resp = self.inner.list_rate_limits(request).await?;
1558 Ok(resp.rate_limits)
1559 }
1560}
1561
1562#[async_trait]
1563impl HummockMetaClient for MetaClient {
1564 async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
1565 let req = UnpinVersionBeforeRequest {
1566 context_id: self.worker_id(),
1567 unpin_version_before: unpin_version_before.to_u64(),
1568 };
1569 self.inner.unpin_version_before(req).await?;
1570 Ok(())
1571 }
1572
1573 async fn get_current_version(&self) -> Result<HummockVersion> {
1574 let req = GetCurrentVersionRequest::default();
1575 Ok(HummockVersion::from_rpc_protobuf(
1576 &self
1577 .inner
1578 .get_current_version(req)
1579 .await?
1580 .current_version
1581 .unwrap(),
1582 ))
1583 }
1584
1585 async fn get_new_sst_ids(&self, number: u32) -> Result<SstObjectIdRange> {
1586 let resp = self
1587 .inner
1588 .get_new_sst_ids(GetNewSstIdsRequest { number })
1589 .await?;
1590 Ok(SstObjectIdRange::new(resp.start_id, resp.end_id))
1591 }
1592
1593 async fn commit_epoch_with_change_log(
1594 &self,
1595 _epoch: HummockEpoch,
1596 _sync_result: SyncResult,
1597 _change_log_info: Option<HummockMetaClientChangeLogInfo>,
1598 ) -> Result<()> {
1599 panic!("Only meta service can commit_epoch in production.")
1600 }
1601
1602 async fn trigger_manual_compaction(
1603 &self,
1604 compaction_group_id: u64,
1605 table_id: u32,
1606 level: u32,
1607 sst_ids: Vec<u64>,
1608 ) -> Result<()> {
1609 let req = TriggerManualCompactionRequest {
1611 compaction_group_id,
1612 table_id,
1613 level,
1616 sst_ids,
1617 ..Default::default()
1618 };
1619
1620 self.inner.trigger_manual_compaction(req).await?;
1621 Ok(())
1622 }
1623
1624 async fn trigger_full_gc(
1625 &self,
1626 sst_retention_time_sec: u64,
1627 prefix: Option<String>,
1628 ) -> Result<()> {
1629 self.inner
1630 .trigger_full_gc(TriggerFullGcRequest {
1631 sst_retention_time_sec,
1632 prefix,
1633 })
1634 .await?;
1635 Ok(())
1636 }
1637
1638 async fn subscribe_compaction_event(
1639 &self,
1640 ) -> Result<(
1641 UnboundedSender<SubscribeCompactionEventRequest>,
1642 BoxStream<'static, CompactionEventItem>,
1643 )> {
1644 let (request_sender, request_receiver) =
1645 unbounded_channel::<SubscribeCompactionEventRequest>();
1646 request_sender
1647 .send(SubscribeCompactionEventRequest {
1648 event: Some(subscribe_compaction_event_request::Event::Register(
1649 Register {
1650 context_id: self.worker_id(),
1651 },
1652 )),
1653 create_at: SystemTime::now()
1654 .duration_since(SystemTime::UNIX_EPOCH)
1655 .expect("Clock may have gone backwards")
1656 .as_millis() as u64,
1657 })
1658 .context("Failed to subscribe compaction event")?;
1659
1660 let stream = self
1661 .inner
1662 .subscribe_compaction_event(Request::new(UnboundedReceiverStream::new(
1663 request_receiver,
1664 )))
1665 .await?;
1666
1667 Ok((request_sender, Box::pin(stream)))
1668 }
1669
1670 async fn get_version_by_epoch(
1671 &self,
1672 epoch: HummockEpoch,
1673 table_id: u32,
1674 ) -> Result<PbHummockVersion> {
1675 self.get_version_by_epoch(epoch, table_id).await
1676 }
1677}
1678
1679#[async_trait]
1680impl TelemetryInfoFetcher for MetaClient {
1681 async fn fetch_telemetry_info(&self) -> std::result::Result<Option<String>, String> {
1682 let resp = self
1683 .get_telemetry_info()
1684 .await
1685 .map_err(|e| e.to_report_string())?;
1686 let tracking_id = resp.get_tracking_id().ok();
1687 Ok(tracking_id.map(|id| id.to_owned()))
1688 }
1689}
1690
1691pub type SinkCoordinationRpcClient = SinkCoordinationServiceClient<Channel>;
1692
1693#[derive(Debug, Clone)]
1694struct GrpcMetaClientCore {
1695 cluster_client: ClusterServiceClient<Channel>,
1696 meta_member_client: MetaMemberServiceClient<Channel>,
1697 heartbeat_client: HeartbeatServiceClient<Channel>,
1698 ddl_client: DdlServiceClient<Channel>,
1699 hummock_client: HummockManagerServiceClient<Channel>,
1700 notification_client: NotificationServiceClient<Channel>,
1701 stream_client: StreamManagerServiceClient<Channel>,
1702 user_client: UserServiceClient<Channel>,
1703 scale_client: ScaleServiceClient<Channel>,
1704 backup_client: BackupServiceClient<Channel>,
1705 telemetry_client: TelemetryInfoServiceClient<Channel>,
1706 system_params_client: SystemParamsServiceClient<Channel>,
1707 session_params_client: SessionParamServiceClient<Channel>,
1708 serving_client: ServingServiceClient<Channel>,
1709 cloud_client: CloudServiceClient<Channel>,
1710 sink_coordinate_client: SinkCoordinationRpcClient,
1711 event_log_client: EventLogServiceClient<Channel>,
1712 cluster_limit_client: ClusterLimitServiceClient<Channel>,
1713}
1714
1715impl GrpcMetaClientCore {
1716 pub(crate) fn new(channel: Channel) -> Self {
1717 let cluster_client = ClusterServiceClient::new(channel.clone());
1718 let meta_member_client = MetaMemberClient::new(channel.clone());
1719 let heartbeat_client = HeartbeatServiceClient::new(channel.clone());
1720 let ddl_client =
1721 DdlServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1722 let hummock_client =
1723 HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1724 let notification_client =
1725 NotificationServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1726 let stream_client =
1727 StreamManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1728 let user_client = UserServiceClient::new(channel.clone());
1729 let scale_client =
1730 ScaleServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1731 let backup_client = BackupServiceClient::new(channel.clone());
1732 let telemetry_client =
1733 TelemetryInfoServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
1734 let system_params_client = SystemParamsServiceClient::new(channel.clone());
1735 let session_params_client = SessionParamServiceClient::new(channel.clone());
1736 let serving_client = ServingServiceClient::new(channel.clone());
1737 let cloud_client = CloudServiceClient::new(channel.clone());
1738 let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone());
1739 let event_log_client = EventLogServiceClient::new(channel.clone());
1740 let cluster_limit_client = ClusterLimitServiceClient::new(channel);
1741
1742 GrpcMetaClientCore {
1743 cluster_client,
1744 meta_member_client,
1745 heartbeat_client,
1746 ddl_client,
1747 hummock_client,
1748 notification_client,
1749 stream_client,
1750 user_client,
1751 scale_client,
1752 backup_client,
1753 telemetry_client,
1754 system_params_client,
1755 session_params_client,
1756 serving_client,
1757 cloud_client,
1758 sink_coordinate_client,
1759 event_log_client,
1760 cluster_limit_client,
1761 }
1762 }
1763}
1764
1765#[derive(Debug, Clone)]
1769struct GrpcMetaClient {
1770 member_monitor_event_sender: mpsc::Sender<Sender<Result<()>>>,
1771 core: Arc<RwLock<GrpcMetaClientCore>>,
1772}
1773
1774type MetaMemberClient = MetaMemberServiceClient<Channel>;
1775
1776struct MetaMemberGroup {
1777 members: LruCache<http::Uri, Option<MetaMemberClient>>,
1778}
1779
1780struct MetaMemberManagement {
1781 core_ref: Arc<RwLock<GrpcMetaClientCore>>,
1782 members: Either<MetaMemberClient, MetaMemberGroup>,
1783 current_leader: http::Uri,
1784 meta_config: MetaConfig,
1785}
1786
1787impl MetaMemberManagement {
1788 const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);
1789
1790 fn host_address_to_uri(addr: HostAddress) -> http::Uri {
1791 format!("http://{}:{}", addr.host, addr.port)
1792 .parse()
1793 .unwrap()
1794 }
1795
1796 async fn recreate_core(&self, channel: Channel) {
1797 let mut core = self.core_ref.write().await;
1798 *core = GrpcMetaClientCore::new(channel);
1799 }
1800
1801 async fn refresh_members(&mut self) -> Result<()> {
1802 let leader_addr = match self.members.as_mut() {
1803 Either::Left(client) => {
1804 let resp = client
1805 .to_owned()
1806 .members(MembersRequest {})
1807 .await
1808 .map_err(RpcError::from_meta_status)?;
1809 let resp = resp.into_inner();
1810 resp.members.into_iter().find(|member| member.is_leader)
1811 }
1812 Either::Right(member_group) => {
1813 let mut fetched_members = None;
1814
1815 for (addr, client) in &mut member_group.members {
1816 let members: Result<_> = try {
1817 let mut client = match client {
1818 Some(cached_client) => cached_client.to_owned(),
1819 None => {
1820 let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
1821 let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
1822 .await
1823 .context("failed to create client")?;
1824 let new_client: MetaMemberClient =
1825 MetaMemberServiceClient::new(channel);
1826 *client = Some(new_client.clone());
1827
1828 new_client
1829 }
1830 };
1831
1832 let resp = client
1833 .members(MembersRequest {})
1834 .await
1835 .context("failed to fetch members")?;
1836
1837 resp.into_inner().members
1838 };
1839
1840 let fetched = members.is_ok();
1841 fetched_members = Some(members);
1842 if fetched {
1843 break;
1844 }
1845 }
1846
1847 let members = fetched_members
1848 .context("no member available in the list")?
1849 .context("could not refresh members")?;
1850
1851 let mut leader = None;
1853 for member in members {
1854 if member.is_leader {
1855 leader = Some(member.clone());
1856 }
1857
1858 let addr = Self::host_address_to_uri(member.address.unwrap());
1859 if !member_group.members.contains(&addr) {
1861 tracing::info!("new meta member joined: {}", addr);
1862 member_group.members.put(addr, None);
1863 }
1864 }
1865
1866 leader
1867 }
1868 };
1869
1870 if let Some(leader) = leader_addr {
1871 let discovered_leader = Self::host_address_to_uri(leader.address.unwrap());
1872
1873 if discovered_leader != self.current_leader {
1874 tracing::info!("new meta leader {} discovered", discovered_leader);
1875
1876 let retry_strategy = GrpcMetaClient::retry_strategy_to_bound(
1877 Duration::from_secs(self.meta_config.meta_leader_lease_secs),
1878 false,
1879 );
1880
1881 let channel = tokio_retry::Retry::spawn(retry_strategy, || async {
1882 let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone());
1883 GrpcMetaClient::connect_to_endpoint(endpoint).await
1884 })
1885 .await?;
1886
1887 self.recreate_core(channel).await;
1888 self.current_leader = discovered_leader;
1889 }
1890 }
1891
1892 Ok(())
1893 }
1894}
1895
1896impl GrpcMetaClient {
1897 const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
1899 const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
1901 const INIT_RETRY_BASE_INTERVAL_MS: u64 = 10;
1903 const INIT_RETRY_MAX_INTERVAL_MS: u64 = 2000;
1905
1906 fn start_meta_member_monitor(
1907 &self,
1908 init_leader_addr: http::Uri,
1909 members: Either<MetaMemberClient, MetaMemberGroup>,
1910 force_refresh_receiver: Receiver<Sender<Result<()>>>,
1911 meta_config: MetaConfig,
1912 ) -> Result<()> {
1913 let core_ref: Arc<RwLock<GrpcMetaClientCore>> = self.core.clone();
1914 let current_leader = init_leader_addr;
1915
1916 let enable_period_tick = matches!(members, Either::Right(_));
1917
1918 let member_management = MetaMemberManagement {
1919 core_ref,
1920 members,
1921 current_leader,
1922 meta_config,
1923 };
1924
1925 let mut force_refresh_receiver = force_refresh_receiver;
1926
1927 tokio::spawn(async move {
1928 let mut member_management = member_management;
1929 let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD);
1930
1931 loop {
1932 let event: Option<Sender<Result<()>>> = if enable_period_tick {
1933 tokio::select! {
1934 _ = ticker.tick() => None,
1935 result_sender = force_refresh_receiver.recv() => {
1936 if result_sender.is_none() {
1937 break;
1938 }
1939
1940 result_sender
1941 },
1942 }
1943 } else {
1944 let result_sender = force_refresh_receiver.recv().await;
1945
1946 if result_sender.is_none() {
1947 break;
1948 }
1949
1950 result_sender
1951 };
1952
1953 let tick_result = member_management.refresh_members().await;
1954 if let Err(e) = tick_result.as_ref() {
1955 tracing::warn!(error = %e.as_report(), "refresh meta member client failed");
1956 }
1957
1958 if let Some(sender) = event {
1959 let _resp = sender.send(tick_result);
1961 }
1962 }
1963 });
1964
1965 Ok(())
1966 }
1967
1968 async fn force_refresh_leader(&self) -> Result<()> {
1969 let (sender, receiver) = oneshot::channel();
1970
1971 self.member_monitor_event_sender
1972 .send(sender)
1973 .await
1974 .map_err(|e| anyhow!(e))?;
1975
1976 receiver.await.map_err(|e| anyhow!(e))?
1977 }
1978
1979 pub async fn new(strategy: &MetaAddressStrategy, config: MetaConfig) -> Result<Self> {
1981 let (channel, addr) = match strategy {
1982 MetaAddressStrategy::LoadBalance(addr) => {
1983 Self::try_build_rpc_channel(vec![addr.clone()]).await
1984 }
1985 MetaAddressStrategy::List(addrs) => Self::try_build_rpc_channel(addrs.clone()).await,
1986 }?;
1987 let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);
1988 let client = GrpcMetaClient {
1989 member_monitor_event_sender: force_refresh_sender,
1990 core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
1991 };
1992
1993 let meta_member_client = client.core.read().await.meta_member_client.clone();
1994 let members = match strategy {
1995 MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client),
1996 MetaAddressStrategy::List(addrs) => {
1997 let mut members = LruCache::new(20);
1998 for addr in addrs {
1999 members.put(addr.clone(), None);
2000 }
2001 members.put(addr.clone(), Some(meta_member_client));
2002
2003 Either::Right(MetaMemberGroup { members })
2004 }
2005 };
2006
2007 client.start_meta_member_monitor(addr, members, force_refresh_receiver, config)?;
2008
2009 client.force_refresh_leader().await?;
2010
2011 Ok(client)
2012 }
2013
2014 fn addr_to_endpoint(addr: http::Uri) -> Endpoint {
2015 Endpoint::from(addr).initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
2016 }
2017
2018 pub(crate) async fn try_build_rpc_channel(
2019 addrs: impl IntoIterator<Item = http::Uri>,
2020 ) -> Result<(Channel, http::Uri)> {
2021 let endpoints: Vec<_> = addrs
2022 .into_iter()
2023 .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
2024 .collect();
2025
2026 let mut last_error = None;
2027
2028 for (endpoint, addr) in endpoints {
2029 match Self::connect_to_endpoint(endpoint).await {
2030 Ok(channel) => {
2031 tracing::info!("Connect to meta server {} successfully", addr);
2032 return Ok((channel, addr));
2033 }
2034 Err(e) => {
2035 tracing::warn!(
2036 error = %e.as_report(),
2037 "Failed to connect to meta server {}, trying again",
2038 addr,
2039 );
2040 last_error = Some(e);
2041 }
2042 }
2043 }
2044
2045 if let Some(last_error) = last_error {
2046 Err(anyhow::anyhow!(last_error)
2047 .context("failed to connect to all meta servers")
2048 .into())
2049 } else {
2050 bail!("no meta server address provided")
2051 }
2052 }
2053
2054 async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
2055 let channel = endpoint
2056 .http2_keep_alive_interval(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
2057 .keep_alive_timeout(Duration::from_secs(Self::ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
2058 .connect_timeout(Duration::from_secs(5))
2059 .monitored_connect("grpc-meta-client", Default::default())
2060 .await?
2061 .wrapped();
2062
2063 Ok(channel)
2064 }
2065
2066 pub(crate) fn retry_strategy_to_bound(
2067 high_bound: Duration,
2068 exceed: bool,
2069 ) -> impl Iterator<Item = Duration> {
2070 let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS)
2071 .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS))
2072 .map(jitter);
2073
2074 let mut sum = Duration::default();
2075
2076 iter.take_while(move |duration| {
2077 sum += *duration;
2078
2079 if exceed {
2080 sum < high_bound + *duration
2081 } else {
2082 sum < high_bound
2083 }
2084 })
2085 }
2086}
2087
2088macro_rules! for_all_meta_rpc {
2089 ($macro:ident) => {
2090 $macro! {
2091 { cluster_client, add_worker_node, AddWorkerNodeRequest, AddWorkerNodeResponse }
2092 ,{ cluster_client, activate_worker_node, ActivateWorkerNodeRequest, ActivateWorkerNodeResponse }
2093 ,{ cluster_client, delete_worker_node, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse }
2094 ,{ cluster_client, update_worker_node_schedulability, UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse }
2095 ,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
2096 ,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
2097 ,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
2098 ,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
2099 ,{ stream_client, flush, FlushRequest, FlushResponse }
2100 ,{ stream_client, pause, PauseRequest, PauseResponse }
2101 ,{ stream_client, resume, ResumeRequest, ResumeResponse }
2102 ,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
2103 ,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
2104 ,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
2105 ,{ stream_client, list_streaming_job_states, ListStreamingJobStatesRequest, ListStreamingJobStatesResponse }
2106 ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
2107 ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
2108 ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse }
2109 ,{ stream_client, list_object_dependencies, ListObjectDependenciesRequest, ListObjectDependenciesResponse }
2110 ,{ stream_client, recover, RecoverRequest, RecoverResponse }
2111 ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse }
2112 ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
2113 ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
2114 ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
2115 ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
2116 ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2117 ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
2118 ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse }
2119 ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse }
2120 ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse }
2121 ,{ ddl_client, create_sink, CreateSinkRequest, CreateSinkResponse }
2122 ,{ ddl_client, create_subscription, CreateSubscriptionRequest, CreateSubscriptionResponse }
2123 ,{ ddl_client, create_schema, CreateSchemaRequest, CreateSchemaResponse }
2124 ,{ ddl_client, create_database, CreateDatabaseRequest, CreateDatabaseResponse }
2125 ,{ ddl_client, create_secret, CreateSecretRequest, CreateSecretResponse }
2126 ,{ ddl_client, create_index, CreateIndexRequest, CreateIndexResponse }
2127 ,{ ddl_client, create_function, CreateFunctionRequest, CreateFunctionResponse }
2128 ,{ ddl_client, drop_table, DropTableRequest, DropTableResponse }
2129 ,{ ddl_client, drop_materialized_view, DropMaterializedViewRequest, DropMaterializedViewResponse }
2130 ,{ ddl_client, drop_view, DropViewRequest, DropViewResponse }
2131 ,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
2132 ,{ ddl_client, drop_secret, DropSecretRequest, DropSecretResponse}
2133 ,{ ddl_client, drop_sink, DropSinkRequest, DropSinkResponse }
2134 ,{ ddl_client, drop_subscription, DropSubscriptionRequest, DropSubscriptionResponse }
2135 ,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
2136 ,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
2137 ,{ ddl_client, drop_index, DropIndexRequest, DropIndexResponse }
2138 ,{ ddl_client, drop_function, DropFunctionRequest, DropFunctionResponse }
2139 ,{ ddl_client, replace_job_plan, ReplaceJobPlanRequest, ReplaceJobPlanResponse }
2140 ,{ ddl_client, alter_source, AlterSourceRequest, AlterSourceResponse }
2141 ,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
2142 ,{ ddl_client, get_ddl_progress, GetDdlProgressRequest, GetDdlProgressResponse }
2143 ,{ ddl_client, create_connection, CreateConnectionRequest, CreateConnectionResponse }
2144 ,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
2145 ,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
2146 ,{ ddl_client, comment_on, CommentOnRequest, CommentOnResponse }
2147 ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
2148 ,{ ddl_client, wait, WaitRequest, WaitResponse }
2149 ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse }
2150 ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse }
2151 ,{ ddl_client, alter_secret, AlterSecretRequest, AlterSecretResponse }
2152 ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
2153 ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
2154 ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
2155 ,{ hummock_client, list_version_deltas, ListVersionDeltasRequest, ListVersionDeltasResponse }
2156 ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse }
2157 ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse }
2158 ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse }
2159 ,{ hummock_client, get_new_sst_ids, GetNewSstIdsRequest, GetNewSstIdsResponse }
2160 ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse }
2161 ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse }
2162 ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse }
2163 ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
2164 ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
2165 ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
2166 ,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
2167 ,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
2168 ,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
2169 ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
2170 ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
2171 ,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
2172 ,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
2173 ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
2174 ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
2175 ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse }
2176 ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
2177 ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
2178 ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
2179 ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
2180 ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse }
2181 ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse }
2182 ,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
2183 ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
2184 ,{ user_client, drop_user, DropUserRequest, DropUserResponse }
2185 ,{ user_client, grant_privilege, GrantPrivilegeRequest, GrantPrivilegeResponse }
2186 ,{ user_client, revoke_privilege, RevokePrivilegeRequest, RevokePrivilegeResponse }
2187 ,{ scale_client, get_cluster_info, GetClusterInfoRequest, GetClusterInfoResponse }
2188 ,{ scale_client, reschedule, RescheduleRequest, RescheduleResponse }
2189 ,{ notification_client, subscribe, SubscribeRequest, Streaming<SubscribeResponse> }
2190 ,{ backup_client, backup_meta, BackupMetaRequest, BackupMetaResponse }
2191 ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse }
2192 ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse}
2193 ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse}
2194 ,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
2195 ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
2196 ,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
2197 ,{ session_params_client, get_session_params, GetSessionParamsRequest, GetSessionParamsResponse }
2198 ,{ session_params_client, set_session_param, SetSessionParamRequest, SetSessionParamResponse }
2199 ,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
2200 ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse }
2201 ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse }
2202 ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse }
2203 ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse }
2204 }
2205 };
2206}
2207
2208impl GrpcMetaClient {
2209 async fn refresh_client_if_needed(&self, code: Code) {
2210 if matches!(
2211 code,
2212 Code::Unknown | Code::Unimplemented | Code::Unavailable
2213 ) {
2214 tracing::debug!("matching tonic code {}", code);
2215 let (result_sender, result_receiver) = oneshot::channel();
2216 if self
2217 .member_monitor_event_sender
2218 .try_send(result_sender)
2219 .is_ok()
2220 {
2221 if let Ok(Err(e)) = result_receiver.await {
2222 tracing::warn!(error = %e.as_report(), "force refresh meta client failed");
2223 }
2224 } else {
2225 tracing::debug!("skipping the current refresh, somewhere else is already doing it")
2226 }
2227 }
2228 }
2229}
2230
2231impl GrpcMetaClient {
2232 for_all_meta_rpc! { meta_rpc_client_method_impl }
2233}