risingwave_ctl/cmd_impl/meta/
reschedule.rs1use std::collections::{HashMap, HashSet};
16use std::process::exit;
17
18use anyhow::{Result, anyhow};
19use inquire::Confirm;
20use itertools::Itertools;
21use regex::Regex;
22use risingwave_meta_model::WorkerId;
23use risingwave_pb::common::WorkerNode;
24use risingwave_pb::meta::{GetClusterInfoResponse, PbWorkerReschedule};
25use serde::{Deserialize, Serialize};
26use thiserror_ext::AsReport;
27
28use crate::CtlContext;
29
30#[derive(Serialize, Deserialize, Debug)]
31pub struct ReschedulePayload {
32 #[serde(rename = "reschedule_revision")]
33 pub reschedule_revision: u64,
34
35 #[serde(rename = "reschedule_plan")]
36 pub worker_reschedule_plan: HashMap<u32, WorkerReschedulePlan>,
37}
38
39#[derive(Serialize, Deserialize, Debug)]
40pub struct WorkerReschedulePlan {
41 #[serde(rename = "actor_count_diff")]
42 pub actor_count_diff: HashMap<WorkerId, i32>,
43}
44
45#[derive(Debug)]
46pub enum RescheduleInput {
47 String(String),
48 FilePath(String),
49}
50
51impl From<WorkerReschedulePlan> for PbWorkerReschedule {
52 fn from(value: WorkerReschedulePlan) -> Self {
53 let WorkerReschedulePlan { actor_count_diff } = value;
54
55 PbWorkerReschedule {
56 worker_actor_diff: actor_count_diff
57 .into_iter()
58 .map(|(k, v)| (k as _, v as _))
59 .collect(),
60 }
61 }
62}
63
64impl From<PbWorkerReschedule> for WorkerReschedulePlan {
65 fn from(value: PbWorkerReschedule) -> Self {
66 let PbWorkerReschedule {
67 worker_actor_diff: actor_count_diff,
68 } = value;
69
70 WorkerReschedulePlan {
71 actor_count_diff: actor_count_diff
72 .into_iter()
73 .map(|(k, v)| (k as _, v as _))
74 .collect(),
75 }
76 }
77}
78
79pub async fn reschedule(
80 context: &CtlContext,
81 plan: Option<String>,
82 revision: Option<u64>,
83 from: Option<String>,
84 dry_run: bool,
85 resolve_no_shuffle: bool,
86) -> Result<()> {
87 let meta_client = context.meta_client().await?;
88
89 let (reschedules, revision) = match (plan, revision, from) {
90 (Some(plan), Some(revision), None) => (parse_plan(plan)?, revision),
91 (None, None, Some(path)) => {
92 let file = std::fs::File::open(path)?;
93 let ReschedulePayload {
94 reschedule_revision,
95 worker_reschedule_plan,
96 } = serde_yaml::from_reader(file)?;
97 (
98 worker_reschedule_plan
99 .into_iter()
100 .map(|(fragment_id, worker_reschedule_plan)| {
101 (fragment_id, worker_reschedule_plan.into())
102 })
103 .collect(),
104 reschedule_revision,
105 )
106 }
107 _ => unreachable!(),
108 };
109
110 if reschedules.is_empty() {
111 return Ok(());
112 }
113
114 for (fragment_id, reschedule) in &reschedules {
115 println!("For fragment #{}", fragment_id);
116 if !reschedule.get_worker_actor_diff().is_empty() {
117 println!("\tChange: {:?}", reschedule.get_worker_actor_diff());
118 }
119
120 println!();
121 }
122
123 if !dry_run {
124 println!("---------------------------");
125 let (success, revision) = meta_client
126 .reschedule(reschedules, revision, resolve_no_shuffle)
127 .await?;
128
129 if !success {
130 println!(
131 "Reschedule failed, please check the plan or the revision, current revision is {}",
132 revision
133 );
134
135 return Err(anyhow!("reschedule failed"));
136 }
137
138 println!("Reschedule success, current revision is {}", revision);
139 }
140
141 Ok(())
142}
143
144fn parse_plan(mut plan: String) -> Result<HashMap<u32, PbWorkerReschedule>> {
146 let mut reschedules = HashMap::new();
147 let regex = Regex::new(r"^(\d+):\[((?:\d+:[+-]?\d+,?)+)]$")?;
148 plan.retain(|c| !c.is_whitespace());
149
150 for fragment_reschedule_plan in plan.split(';') {
151 if fragment_reschedule_plan.is_empty() {
152 continue;
153 }
154
155 let captures = regex
156 .captures(fragment_reschedule_plan)
157 .ok_or_else(|| anyhow!("plan \"{}\" format illegal", fragment_reschedule_plan))?;
158
159 let fragment_id = captures
160 .get(1)
161 .and_then(|mat| mat.as_str().parse::<u32>().ok())
162 .ok_or_else(|| anyhow!("plan \"{}\" does not have a valid fragment id", plan))?;
163
164 let worker_changes: Vec<&str> = captures[2].split(',').collect();
165
166 let mut worker_actor_diff = HashMap::new();
167 for worker_change in &worker_changes {
168 let (worker_id, count) = worker_change
169 .split(':')
170 .map(|v| v.parse::<i32>().unwrap())
171 .collect_tuple::<(_, _)>()
172 .unwrap();
173
174 if let Some(dup_change) = worker_actor_diff.insert(worker_id as u32, count) {
175 anyhow::bail!(
176 "duplicate worker id {worker_id} in plan, prev {worker_id} -> {dup_change}",
177 );
178 }
179 }
180
181 if !worker_actor_diff.is_empty() {
182 reschedules.insert(fragment_id, PbWorkerReschedule { worker_actor_diff });
183 }
184 }
185 Ok(reschedules)
186}
187
188pub async fn unregister_workers(
189 context: &CtlContext,
190 workers: Vec<String>,
191 yes: bool,
192 ignore_not_found: bool,
193 check_fragment_occupied: bool,
194) -> Result<()> {
195 let meta_client = context.meta_client().await?;
196
197 let GetClusterInfoResponse {
198 worker_nodes,
199 table_fragments: all_table_fragments,
200 ..
201 } = match meta_client.get_cluster_info().await {
202 Ok(info) => info,
203 Err(e) => {
204 println!("Failed to get cluster info: {}", e.as_report());
205 exit(1);
206 }
207 };
208
209 let worker_index_by_host: HashMap<_, _> = worker_nodes
210 .iter()
211 .map(|worker| {
212 let host = worker.get_host().expect("host should not be empty");
213 (format!("{}:{}", host.host, host.port), worker.id)
214 })
215 .collect();
216
217 let mut target_worker_ids: HashSet<_> = HashSet::new();
218
219 let worker_ids: HashSet<_> = worker_nodes.iter().map(|worker| worker.id).collect();
220
221 for worker in workers {
222 let worker_id = worker
223 .parse::<u32>()
224 .ok()
225 .or_else(|| worker_index_by_host.get(&worker).cloned());
226
227 if let Some(worker_id) = worker_id
228 && worker_ids.contains(&worker_id)
229 {
230 if !target_worker_ids.insert(worker_id) {
231 println!("Warn: {} and {} are the same worker", worker, worker_id);
232 }
233 } else {
234 if ignore_not_found {
235 println!("Warn: worker {} not found, ignored", worker);
236 continue;
237 }
238
239 println!("Could not find worker {}", worker);
240 exit(1);
241 }
242 }
243
244 if target_worker_ids.is_empty() {
245 if ignore_not_found {
246 println!("Warn: No worker provided, ignored");
247 return Ok(());
248 }
249 println!("No worker provided");
250 exit(1);
251 }
252
253 let target_workers = worker_nodes
254 .into_iter()
255 .filter(|worker| target_worker_ids.contains(&worker.id))
256 .collect_vec();
257
258 for table_fragments in &all_table_fragments {
259 for (fragment_id, fragment) in &table_fragments.fragments {
260 let occupied_worker_ids: HashSet<_> = fragment
261 .actors
262 .iter()
263 .map(|actor| {
264 table_fragments
265 .actor_status
266 .get(&actor.actor_id)
267 .map(|actor_status| actor_status.worker_id())
268 .unwrap()
269 })
270 .collect();
271
272 let intersection_worker_ids: HashSet<_> = occupied_worker_ids
273 .intersection(&target_worker_ids)
274 .collect();
275
276 if check_fragment_occupied && !intersection_worker_ids.is_empty() {
277 println!(
278 "worker ids {:?} are still occupied by fragment #{}",
279 intersection_worker_ids, fragment_id
280 );
281 exit(1);
282 }
283 }
284 }
285
286 if !yes {
287 match Confirm::new("Will perform actions on the cluster, are you sure?")
288 .with_default(false)
289 .with_help_message("Use the --yes or -y option to skip this prompt")
290 .with_placeholder("no")
291 .prompt()
292 {
293 Ok(true) => println!("Processing..."),
294 Ok(false) => {
295 println!("Abort.");
296 exit(1);
297 }
298 Err(_) => {
299 println!("Error with questionnaire, try again later");
300 exit(-1);
301 }
302 }
303 }
304
305 for WorkerNode { id, host, .. } in target_workers {
306 let host = match host {
307 None => {
308 println!("Worker #{} does not have a host, skipping", id);
309 continue;
310 }
311 Some(host) => host,
312 };
313
314 println!("Unregistering worker #{}, address: {:?}", id, host);
315 if let Err(e) = meta_client.delete_worker_node(host).await {
316 println!("Failed to delete worker #{}: {}", id, e.as_report());
317 };
318 }
319
320 println!("Done");
321
322 Ok(())
323}