risingwave_common/util/
cluster_limit.rs1use std::collections::HashMap;
16
17use risingwave_pb::meta::actor_count_per_parallelism::PbWorkerActorCount;
18use risingwave_pb::meta::cluster_limit::PbLimit;
19use risingwave_pb::meta::{PbActorCountPerParallelism, PbClusterLimit};
20pub enum ClusterLimit {
21 ActorCount(ActorCountPerParallelism),
22}
23
24impl From<ClusterLimit> for PbClusterLimit {
25 fn from(limit: ClusterLimit) -> Self {
26 match limit {
27 ClusterLimit::ActorCount(actor_count_per_parallelism) => PbClusterLimit {
28 limit: Some(PbLimit::ActorCount(actor_count_per_parallelism.into())),
29 },
30 }
31 }
32}
33
34impl From<PbClusterLimit> for ClusterLimit {
35 fn from(pb_limit: PbClusterLimit) -> Self {
36 match pb_limit.limit.unwrap() {
37 PbLimit::ActorCount(actor_count_per_parallelism) => {
38 ClusterLimit::ActorCount(actor_count_per_parallelism.into())
39 }
40 }
41 }
42}
43
44#[derive(Debug)]
45pub struct WorkerActorCount {
46 pub actor_count: usize,
47 pub parallelism: usize,
48}
49
50impl From<WorkerActorCount> for PbWorkerActorCount {
51 fn from(worker_actor_count: WorkerActorCount) -> Self {
52 PbWorkerActorCount {
53 actor_count: worker_actor_count.actor_count as u64,
54 parallelism: worker_actor_count.parallelism as u64,
55 }
56 }
57}
58
59impl From<PbWorkerActorCount> for WorkerActorCount {
60 fn from(pb_worker_actor_count: PbWorkerActorCount) -> Self {
61 WorkerActorCount {
62 actor_count: pb_worker_actor_count.actor_count as usize,
63 parallelism: pb_worker_actor_count.parallelism as usize,
64 }
65 }
66}
67
68pub struct ActorCountPerParallelism {
69 pub worker_id_to_actor_count: HashMap<u32, WorkerActorCount>,
70 pub hard_limit: usize,
71 pub soft_limit: usize,
72}
73
74impl From<ActorCountPerParallelism> for PbActorCountPerParallelism {
75 fn from(actor_count_per_parallelism: ActorCountPerParallelism) -> Self {
76 PbActorCountPerParallelism {
77 worker_id_to_actor_count: actor_count_per_parallelism
78 .worker_id_to_actor_count
79 .into_iter()
80 .map(|(k, v)| (k, v.into()))
81 .collect(),
82 hard_limit: actor_count_per_parallelism.hard_limit as u64,
83 soft_limit: actor_count_per_parallelism.soft_limit as u64,
84 }
85 }
86}
87
88impl From<PbActorCountPerParallelism> for ActorCountPerParallelism {
89 fn from(pb_actor_count_per_parallelism: PbActorCountPerParallelism) -> Self {
90 ActorCountPerParallelism {
91 worker_id_to_actor_count: pb_actor_count_per_parallelism
92 .worker_id_to_actor_count
93 .into_iter()
94 .map(|(k, v)| (k, v.into()))
95 .collect(),
96 hard_limit: pb_actor_count_per_parallelism.hard_limit as usize,
97 soft_limit: pb_actor_count_per_parallelism.soft_limit as usize,
98 }
99 }
100}
101
102impl ActorCountPerParallelism {
103 pub fn exceed_hard_limit(&self) -> bool {
104 self.worker_id_to_actor_count
105 .values()
106 .any(|v| v.actor_count > self.hard_limit.saturating_mul(v.parallelism))
107 }
108
109 pub fn exceed_soft_limit(&self) -> bool {
110 self.worker_id_to_actor_count
111 .values()
112 .any(|v| v.actor_count > self.soft_limit.saturating_mul(v.parallelism))
113 }
114
115 pub fn exceed_limit(&self) -> bool {
116 self.exceed_soft_limit() || self.exceed_hard_limit()
117 }
118}