risingwave_common/util/
cluster_limit.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use risingwave_pb::meta::actor_count_per_parallelism::PbWorkerActorCount;
18use risingwave_pb::meta::cluster_limit::PbLimit;
19use risingwave_pb::meta::{PbActorCountPerParallelism, PbClusterLimit};
20pub enum ClusterLimit {
21    ActorCount(ActorCountPerParallelism),
22}
23
24impl From<ClusterLimit> for PbClusterLimit {
25    fn from(limit: ClusterLimit) -> Self {
26        match limit {
27            ClusterLimit::ActorCount(actor_count_per_parallelism) => PbClusterLimit {
28                limit: Some(PbLimit::ActorCount(actor_count_per_parallelism.into())),
29            },
30        }
31    }
32}
33
34impl From<PbClusterLimit> for ClusterLimit {
35    fn from(pb_limit: PbClusterLimit) -> Self {
36        match pb_limit.limit.unwrap() {
37            PbLimit::ActorCount(actor_count_per_parallelism) => {
38                ClusterLimit::ActorCount(actor_count_per_parallelism.into())
39            }
40        }
41    }
42}
43
44#[derive(Debug)]
45pub struct WorkerActorCount {
46    pub actor_count: usize,
47    pub parallelism: usize,
48}
49
50impl From<WorkerActorCount> for PbWorkerActorCount {
51    fn from(worker_actor_count: WorkerActorCount) -> Self {
52        PbWorkerActorCount {
53            actor_count: worker_actor_count.actor_count as u64,
54            parallelism: worker_actor_count.parallelism as u64,
55        }
56    }
57}
58
59impl From<PbWorkerActorCount> for WorkerActorCount {
60    fn from(pb_worker_actor_count: PbWorkerActorCount) -> Self {
61        WorkerActorCount {
62            actor_count: pb_worker_actor_count.actor_count as usize,
63            parallelism: pb_worker_actor_count.parallelism as usize,
64        }
65    }
66}
67
68pub struct ActorCountPerParallelism {
69    pub worker_id_to_actor_count: HashMap<u32, WorkerActorCount>,
70    pub hard_limit: usize,
71    pub soft_limit: usize,
72}
73
74impl From<ActorCountPerParallelism> for PbActorCountPerParallelism {
75    fn from(actor_count_per_parallelism: ActorCountPerParallelism) -> Self {
76        PbActorCountPerParallelism {
77            worker_id_to_actor_count: actor_count_per_parallelism
78                .worker_id_to_actor_count
79                .into_iter()
80                .map(|(k, v)| (k, v.into()))
81                .collect(),
82            hard_limit: actor_count_per_parallelism.hard_limit as u64,
83            soft_limit: actor_count_per_parallelism.soft_limit as u64,
84        }
85    }
86}
87
88impl From<PbActorCountPerParallelism> for ActorCountPerParallelism {
89    fn from(pb_actor_count_per_parallelism: PbActorCountPerParallelism) -> Self {
90        ActorCountPerParallelism {
91            worker_id_to_actor_count: pb_actor_count_per_parallelism
92                .worker_id_to_actor_count
93                .into_iter()
94                .map(|(k, v)| (k, v.into()))
95                .collect(),
96            hard_limit: pb_actor_count_per_parallelism.hard_limit as usize,
97            soft_limit: pb_actor_count_per_parallelism.soft_limit as usize,
98        }
99    }
100}
101
102impl ActorCountPerParallelism {
103    pub fn exceed_hard_limit(&self) -> bool {
104        self.worker_id_to_actor_count
105            .values()
106            .any(|v| v.actor_count > self.hard_limit.saturating_mul(v.parallelism))
107    }
108
109    pub fn exceed_soft_limit(&self) -> bool {
110        self.worker_id_to_actor_count
111            .values()
112            .any(|v| v.actor_count > self.soft_limit.saturating_mul(v.parallelism))
113    }
114
115    pub fn exceed_limit(&self) -> bool {
116        self.exceed_soft_limit() || self.exceed_hard_limit()
117    }
118}