risingwave_meta_service/
cluster_limit_service.rs1use std::collections::HashMap;
16
17use risingwave_common::util::cluster_limit::{
18 ActorCountPerParallelism, ClusterLimit, WorkerActorCount,
19};
20use risingwave_meta::MetaResult;
21use risingwave_meta::manager::{MetaSrvEnv, MetadataManager};
22use risingwave_meta_model::WorkerId;
23use risingwave_pb::common::WorkerType;
24use risingwave_pb::common::worker_node::State;
25use risingwave_pb::meta::cluster_limit_service_server::ClusterLimitService;
26use risingwave_pb::meta::{GetClusterLimitsRequest, GetClusterLimitsResponse};
27use tonic::{Request, Response, Status};
28
29#[derive(Clone)]
30pub struct ClusterLimitServiceImpl {
31 env: MetaSrvEnv,
32 metadata_manager: MetadataManager,
33}
34
35impl ClusterLimitServiceImpl {
36 pub fn new(env: MetaSrvEnv, metadata_manager: MetadataManager) -> Self {
37 ClusterLimitServiceImpl {
38 env,
39 metadata_manager,
40 }
41 }
42
43 async fn get_active_actor_limit(&self) -> MetaResult<Option<ClusterLimit>> {
44 let (soft_limit, hard_limit) = (
45 self.env.opts.actor_cnt_per_worker_parallelism_soft_limit,
46 self.env.opts.actor_cnt_per_worker_parallelism_hard_limit,
47 );
48
49 let running_worker_parallelism: HashMap<WorkerId, usize> = self
50 .metadata_manager
51 .list_worker_node(Some(WorkerType::ComputeNode), Some(State::Running))
52 .await?
53 .into_iter()
54 .map(|e| (e.id as _, e.compute_node_parallelism()))
55 .collect();
56 let worker_actor_count: HashMap<u32, WorkerActorCount> = self
57 .metadata_manager
58 .worker_actor_count()?
59 .into_iter()
60 .filter_map(|(worker_id, actor_count)| {
61 running_worker_parallelism
62 .get(&worker_id)
63 .map(|parallelism| {
64 (
65 worker_id as _,
66 WorkerActorCount {
67 actor_count,
68 parallelism: *parallelism,
69 },
70 )
71 })
72 })
73 .collect();
74
75 let limit = ActorCountPerParallelism {
76 worker_id_to_actor_count: worker_actor_count,
77 hard_limit,
78 soft_limit,
79 };
80
81 if limit.exceed_limit() {
82 Ok(Some(ClusterLimit::ActorCount(limit)))
83 } else {
84 Ok(None)
85 }
86 }
87}
88
89#[async_trait::async_trait]
90impl ClusterLimitService for ClusterLimitServiceImpl {
91 async fn get_cluster_limits(
92 &self,
93 _request: Request<GetClusterLimitsRequest>,
94 ) -> Result<Response<GetClusterLimitsResponse>, Status> {
95 match self.get_active_actor_limit().await {
97 Ok(Some(limit)) => Ok(Response::new(GetClusterLimitsResponse {
98 active_limits: vec![limit.into()],
99 })),
100 Ok(None) => Ok(Response::new(GetClusterLimitsResponse {
101 active_limits: vec![],
102 })),
103 Err(e) => Err(e.into()),
104 }
105 }
106}