risingwave_common/util/
cluster_limit.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use risingwave_pb::id::WorkerId;
18use risingwave_pb::meta::actor_count_per_parallelism::PbWorkerActorCount;
19use risingwave_pb::meta::cluster_limit::PbLimit;
20use risingwave_pb::meta::{PbActorCountPerParallelism, PbClusterLimit};
21pub enum ClusterLimit {
22    ActorCount(ActorCountPerParallelism),
23}
24
25impl From<ClusterLimit> for PbClusterLimit {
26    fn from(limit: ClusterLimit) -> Self {
27        match limit {
28            ClusterLimit::ActorCount(actor_count_per_parallelism) => PbClusterLimit {
29                limit: Some(PbLimit::ActorCount(actor_count_per_parallelism.into())),
30            },
31        }
32    }
33}
34
35impl From<PbClusterLimit> for ClusterLimit {
36    fn from(pb_limit: PbClusterLimit) -> Self {
37        match pb_limit.limit.unwrap() {
38            PbLimit::ActorCount(actor_count_per_parallelism) => {
39                ClusterLimit::ActorCount(actor_count_per_parallelism.into())
40            }
41        }
42    }
43}
44
45#[derive(Debug)]
46pub struct WorkerActorCount {
47    pub actor_count: usize,
48    pub parallelism: usize,
49}
50
51impl From<WorkerActorCount> for PbWorkerActorCount {
52    fn from(worker_actor_count: WorkerActorCount) -> Self {
53        PbWorkerActorCount {
54            actor_count: worker_actor_count.actor_count as u64,
55            parallelism: worker_actor_count.parallelism as u64,
56        }
57    }
58}
59
60impl From<PbWorkerActorCount> for WorkerActorCount {
61    fn from(pb_worker_actor_count: PbWorkerActorCount) -> Self {
62        WorkerActorCount {
63            actor_count: pb_worker_actor_count.actor_count as usize,
64            parallelism: pb_worker_actor_count.parallelism as usize,
65        }
66    }
67}
68
69pub struct ActorCountPerParallelism {
70    pub worker_id_to_actor_count: HashMap<WorkerId, WorkerActorCount>,
71    pub hard_limit: usize,
72    pub soft_limit: usize,
73}
74
75impl From<ActorCountPerParallelism> for PbActorCountPerParallelism {
76    fn from(actor_count_per_parallelism: ActorCountPerParallelism) -> Self {
77        PbActorCountPerParallelism {
78            worker_id_to_actor_count: actor_count_per_parallelism
79                .worker_id_to_actor_count
80                .into_iter()
81                .map(|(k, v)| (k, v.into()))
82                .collect(),
83            hard_limit: actor_count_per_parallelism.hard_limit as u64,
84            soft_limit: actor_count_per_parallelism.soft_limit as u64,
85        }
86    }
87}
88
89impl From<PbActorCountPerParallelism> for ActorCountPerParallelism {
90    fn from(pb_actor_count_per_parallelism: PbActorCountPerParallelism) -> Self {
91        ActorCountPerParallelism {
92            worker_id_to_actor_count: pb_actor_count_per_parallelism
93                .worker_id_to_actor_count
94                .into_iter()
95                .map(|(k, v)| (k, v.into()))
96                .collect(),
97            hard_limit: pb_actor_count_per_parallelism.hard_limit as usize,
98            soft_limit: pb_actor_count_per_parallelism.soft_limit as usize,
99        }
100    }
101}
102
103impl ActorCountPerParallelism {
104    pub fn exceed_hard_limit(&self) -> bool {
105        self.worker_id_to_actor_count
106            .values()
107            .any(|v| v.actor_count > self.hard_limit.saturating_mul(v.parallelism))
108    }
109
110    pub fn exceed_soft_limit(&self) -> bool {
111        self.worker_id_to_actor_count
112            .values()
113            .any(|v| v.actor_count > self.soft_limit.saturating_mul(v.parallelism))
114    }
115
116    pub fn exceed_limit(&self) -> bool {
117        self.exceed_soft_limit() || self.exceed_hard_limit()
118    }
119}