risingwave_meta_service/
cluster_limit_service.rs1use std::collections::HashMap;
16
17use risingwave_common::util::cluster_limit::{
18 ActorCountPerParallelism, ClusterLimit, WorkerActorCount,
19};
20use risingwave_meta::MetaResult;
21use risingwave_meta::manager::{MetaSrvEnv, MetadataManager};
22use risingwave_meta_model::WorkerId;
23use risingwave_pb::common::WorkerType;
24use risingwave_pb::common::worker_node::State;
25use risingwave_pb::meta::cluster_limit_service_server::ClusterLimitService;
26use risingwave_pb::meta::{GetClusterLimitsRequest, GetClusterLimitsResponse};
27use tonic::{Request, Response, Status};
28
29#[derive(Clone)]
30pub struct ClusterLimitServiceImpl {
31 env: MetaSrvEnv,
32 metadata_manager: MetadataManager,
33}
34
35impl ClusterLimitServiceImpl {
36 pub fn new(env: MetaSrvEnv, metadata_manager: MetadataManager) -> Self {
37 ClusterLimitServiceImpl {
38 env,
39 metadata_manager,
40 }
41 }
42
43 async fn get_active_actor_limit(&self) -> MetaResult<Option<ClusterLimit>> {
44 let (soft_limit, hard_limit) = (
45 self.env.opts.actor_cnt_per_worker_parallelism_soft_limit,
46 self.env.opts.actor_cnt_per_worker_parallelism_hard_limit,
47 );
48
49 let running_worker_parallelism: HashMap<WorkerId, usize> = self
50 .metadata_manager
51 .list_worker_node(Some(WorkerType::ComputeNode), Some(State::Running))
52 .await?
53 .into_iter()
54 .map(|e| (e.id as _, e.compute_node_parallelism()))
55 .collect();
56 let worker_actor_count: HashMap<u32, WorkerActorCount> = self
57 .metadata_manager
58 .worker_actor_count()
59 .await?
60 .into_iter()
61 .filter_map(|(worker_id, actor_count)| {
62 running_worker_parallelism
63 .get(&worker_id)
64 .map(|parallelism| {
65 (
66 worker_id as _,
67 WorkerActorCount {
68 actor_count,
69 parallelism: *parallelism,
70 },
71 )
72 })
73 })
74 .collect();
75
76 let limit = ActorCountPerParallelism {
77 worker_id_to_actor_count: worker_actor_count,
78 hard_limit,
79 soft_limit,
80 };
81
82 if limit.exceed_limit() {
83 Ok(Some(ClusterLimit::ActorCount(limit)))
84 } else {
85 Ok(None)
86 }
87 }
88}
89
90#[async_trait::async_trait]
91impl ClusterLimitService for ClusterLimitServiceImpl {
92 #[cfg_attr(coverage, coverage(off))]
93 async fn get_cluster_limits(
94 &self,
95 _request: Request<GetClusterLimitsRequest>,
96 ) -> Result<Response<GetClusterLimitsResponse>, Status> {
97 match self.get_active_actor_limit().await {
99 Ok(Some(limit)) => Ok(Response::new(GetClusterLimitsResponse {
100 active_limits: vec![limit.into()],
101 })),
102 Ok(None) => Ok(Response::new(GetClusterLimitsResponse {
103 active_limits: vec![],
104 })),
105 Err(e) => Err(e.into()),
106 }
107 }
108}