risingwave_meta_service/
cluster_limit_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use risingwave_common::util::cluster_limit::{
18    ActorCountPerParallelism, ClusterLimit, WorkerActorCount,
19};
20use risingwave_meta::MetaResult;
21use risingwave_meta::manager::{MetaSrvEnv, MetadataManager};
22use risingwave_meta_model::WorkerId;
23use risingwave_pb::common::WorkerType;
24use risingwave_pb::common::worker_node::State;
25use risingwave_pb::meta::cluster_limit_service_server::ClusterLimitService;
26use risingwave_pb::meta::{GetClusterLimitsRequest, GetClusterLimitsResponse};
27use tonic::{Request, Response, Status};
28
29#[derive(Clone)]
30pub struct ClusterLimitServiceImpl {
31    env: MetaSrvEnv,
32    metadata_manager: MetadataManager,
33}
34
35impl ClusterLimitServiceImpl {
36    pub fn new(env: MetaSrvEnv, metadata_manager: MetadataManager) -> Self {
37        ClusterLimitServiceImpl {
38            env,
39            metadata_manager,
40        }
41    }
42
43    async fn get_active_actor_limit(&self) -> MetaResult<Option<ClusterLimit>> {
44        let (soft_limit, hard_limit) = (
45            self.env.opts.actor_cnt_per_worker_parallelism_soft_limit,
46            self.env.opts.actor_cnt_per_worker_parallelism_hard_limit,
47        );
48
49        let running_worker_parallelism: HashMap<WorkerId, usize> = self
50            .metadata_manager
51            .list_worker_node(Some(WorkerType::ComputeNode), Some(State::Running))
52            .await?
53            .into_iter()
54            .map(|e| (e.id as _, e.compute_node_parallelism()))
55            .collect();
56        let worker_actor_count: HashMap<u32, WorkerActorCount> = self
57            .metadata_manager
58            .worker_actor_count()?
59            .into_iter()
60            .filter_map(|(worker_id, actor_count)| {
61                running_worker_parallelism
62                    .get(&worker_id)
63                    .map(|parallelism| {
64                        (
65                            worker_id as _,
66                            WorkerActorCount {
67                                actor_count,
68                                parallelism: *parallelism,
69                            },
70                        )
71                    })
72            })
73            .collect();
74
75        let limit = ActorCountPerParallelism {
76            worker_id_to_actor_count: worker_actor_count,
77            hard_limit,
78            soft_limit,
79        };
80
81        if limit.exceed_limit() {
82            Ok(Some(ClusterLimit::ActorCount(limit)))
83        } else {
84            Ok(None)
85        }
86    }
87}
88
89#[async_trait::async_trait]
90impl ClusterLimitService for ClusterLimitServiceImpl {
91    async fn get_cluster_limits(
92        &self,
93        _request: Request<GetClusterLimitsRequest>,
94    ) -> Result<Response<GetClusterLimitsResponse>, Status> {
95        // TODO: support more limits
96        match self.get_active_actor_limit().await {
97            Ok(Some(limit)) => Ok(Response::new(GetClusterLimitsResponse {
98                active_limits: vec![limit.into()],
99            })),
100            Ok(None) => Ok(Response::new(GetClusterLimitsResponse {
101                active_limits: vec![],
102            })),
103            Err(e) => Err(e.into()),
104        }
105    }
106}