risingwave_common/util/
cluster_limit.rs1use std::collections::HashMap;
16
17use risingwave_pb::id::WorkerId;
18use risingwave_pb::meta::actor_count_per_parallelism::PbWorkerActorCount;
19use risingwave_pb::meta::cluster_limit::PbLimit;
20use risingwave_pb::meta::{PbActorCountPerParallelism, PbClusterLimit};
21pub enum ClusterLimit {
22 ActorCount(ActorCountPerParallelism),
23}
24
25impl From<ClusterLimit> for PbClusterLimit {
26 fn from(limit: ClusterLimit) -> Self {
27 match limit {
28 ClusterLimit::ActorCount(actor_count_per_parallelism) => PbClusterLimit {
29 limit: Some(PbLimit::ActorCount(actor_count_per_parallelism.into())),
30 },
31 }
32 }
33}
34
35impl From<PbClusterLimit> for ClusterLimit {
36 fn from(pb_limit: PbClusterLimit) -> Self {
37 match pb_limit.limit.unwrap() {
38 PbLimit::ActorCount(actor_count_per_parallelism) => {
39 ClusterLimit::ActorCount(actor_count_per_parallelism.into())
40 }
41 }
42 }
43}
44
45#[derive(Debug)]
46pub struct WorkerActorCount {
47 pub actor_count: usize,
48 pub parallelism: usize,
49}
50
51impl From<WorkerActorCount> for PbWorkerActorCount {
52 fn from(worker_actor_count: WorkerActorCount) -> Self {
53 PbWorkerActorCount {
54 actor_count: worker_actor_count.actor_count as u64,
55 parallelism: worker_actor_count.parallelism as u64,
56 }
57 }
58}
59
60impl From<PbWorkerActorCount> for WorkerActorCount {
61 fn from(pb_worker_actor_count: PbWorkerActorCount) -> Self {
62 WorkerActorCount {
63 actor_count: pb_worker_actor_count.actor_count as usize,
64 parallelism: pb_worker_actor_count.parallelism as usize,
65 }
66 }
67}
68
69pub struct ActorCountPerParallelism {
70 pub worker_id_to_actor_count: HashMap<WorkerId, WorkerActorCount>,
71 pub hard_limit: usize,
72 pub soft_limit: usize,
73}
74
75impl From<ActorCountPerParallelism> for PbActorCountPerParallelism {
76 fn from(actor_count_per_parallelism: ActorCountPerParallelism) -> Self {
77 PbActorCountPerParallelism {
78 worker_id_to_actor_count: actor_count_per_parallelism
79 .worker_id_to_actor_count
80 .into_iter()
81 .map(|(k, v)| (k, v.into()))
82 .collect(),
83 hard_limit: actor_count_per_parallelism.hard_limit as u64,
84 soft_limit: actor_count_per_parallelism.soft_limit as u64,
85 }
86 }
87}
88
89impl From<PbActorCountPerParallelism> for ActorCountPerParallelism {
90 fn from(pb_actor_count_per_parallelism: PbActorCountPerParallelism) -> Self {
91 ActorCountPerParallelism {
92 worker_id_to_actor_count: pb_actor_count_per_parallelism
93 .worker_id_to_actor_count
94 .into_iter()
95 .map(|(k, v)| (k, v.into()))
96 .collect(),
97 hard_limit: pb_actor_count_per_parallelism.hard_limit as usize,
98 soft_limit: pb_actor_count_per_parallelism.soft_limit as usize,
99 }
100 }
101}
102
103impl ActorCountPerParallelism {
104 pub fn exceed_hard_limit(&self) -> bool {
105 self.worker_id_to_actor_count
106 .values()
107 .any(|v| v.actor_count > self.hard_limit.saturating_mul(v.parallelism))
108 }
109
110 pub fn exceed_soft_limit(&self) -> bool {
111 self.worker_id_to_actor_count
112 .values()
113 .any(|v| v.actor_count > self.soft_limit.saturating_mul(v.parallelism))
114 }
115
116 pub fn exceed_limit(&self) -> bool {
117 self.exceed_soft_limit() || self.exceed_hard_limit()
118 }
119}