risingwave_meta_service/
cluster_limit_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use risingwave_common::util::cluster_limit::{
18    ActorCountPerParallelism, ClusterLimit, WorkerActorCount,
19};
20use risingwave_meta::MetaResult;
21use risingwave_meta::manager::{MetaSrvEnv, MetadataManager};
22use risingwave_meta_model::WorkerId;
23use risingwave_pb::common::WorkerType;
24use risingwave_pb::common::worker_node::State;
25use risingwave_pb::meta::cluster_limit_service_server::ClusterLimitService;
26use risingwave_pb::meta::{GetClusterLimitsRequest, GetClusterLimitsResponse};
27use tonic::{Request, Response, Status};
28
29#[derive(Clone)]
30pub struct ClusterLimitServiceImpl {
31    env: MetaSrvEnv,
32    metadata_manager: MetadataManager,
33}
34
35impl ClusterLimitServiceImpl {
36    pub fn new(env: MetaSrvEnv, metadata_manager: MetadataManager) -> Self {
37        ClusterLimitServiceImpl {
38            env,
39            metadata_manager,
40        }
41    }
42
43    async fn get_active_actor_limit(&self) -> MetaResult<Option<ClusterLimit>> {
44        let (soft_limit, hard_limit) = (
45            self.env.opts.actor_cnt_per_worker_parallelism_soft_limit,
46            self.env.opts.actor_cnt_per_worker_parallelism_hard_limit,
47        );
48
49        let running_worker_parallelism: HashMap<WorkerId, usize> = self
50            .metadata_manager
51            .list_worker_node(Some(WorkerType::ComputeNode), Some(State::Running))
52            .await?
53            .into_iter()
54            .map(|e| (e.id as _, e.compute_node_parallelism()))
55            .collect();
56        let worker_actor_count: HashMap<u32, WorkerActorCount> = self
57            .metadata_manager
58            .worker_actor_count()
59            .await?
60            .into_iter()
61            .filter_map(|(worker_id, actor_count)| {
62                running_worker_parallelism
63                    .get(&worker_id)
64                    .map(|parallelism| {
65                        (
66                            worker_id as _,
67                            WorkerActorCount {
68                                actor_count,
69                                parallelism: *parallelism,
70                            },
71                        )
72                    })
73            })
74            .collect();
75
76        let limit = ActorCountPerParallelism {
77            worker_id_to_actor_count: worker_actor_count,
78            hard_limit,
79            soft_limit,
80        };
81
82        if limit.exceed_limit() {
83            Ok(Some(ClusterLimit::ActorCount(limit)))
84        } else {
85            Ok(None)
86        }
87    }
88}
89
90#[async_trait::async_trait]
91impl ClusterLimitService for ClusterLimitServiceImpl {
92    #[cfg_attr(coverage, coverage(off))]
93    async fn get_cluster_limits(
94        &self,
95        _request: Request<GetClusterLimitsRequest>,
96    ) -> Result<Response<GetClusterLimitsResponse>, Status> {
97        // TODO: support more limits
98        match self.get_active_actor_limit().await {
99            Ok(Some(limit)) => Ok(Response::new(GetClusterLimitsResponse {
100                active_limits: vec![limit.into()],
101            })),
102            Ok(None) => Ok(Response::new(GetClusterLimitsResponse {
103                active_limits: vec![],
104            })),
105            Err(e) => Err(e.into()),
106        }
107    }
108}