risingwave_ctl/cmd_impl/meta/
reschedule.rs1use std::collections::{HashMap, HashSet};
16use std::process::exit;
17
18use anyhow::{Result, anyhow};
19use inquire::Confirm;
20use itertools::Itertools;
21use regex::Regex;
22use risingwave_meta_model::WorkerId;
23use risingwave_pb::common::WorkerNode;
24use risingwave_pb::meta::{GetClusterInfoResponse, PbWorkerReschedule};
25use serde::{Deserialize, Serialize};
26use thiserror_ext::AsReport;
27
28use crate::CtlContext;
29
30#[derive(Serialize, Deserialize, Debug)]
31pub struct ReschedulePayload {
32 #[serde(rename = "reschedule_revision")]
33 pub reschedule_revision: u64,
34
35 #[serde(rename = "reschedule_plan")]
36 pub worker_reschedule_plan: HashMap<u32, WorkerReschedulePlan>,
37}
38
39#[derive(Serialize, Deserialize, Debug)]
40pub struct WorkerReschedulePlan {
41 #[serde(rename = "actor_count_diff")]
42 pub actor_count_diff: HashMap<WorkerId, i32>,
43}
44
45#[derive(Debug)]
46pub enum RescheduleInput {
47 String(String),
48 FilePath(String),
49}
50
51impl From<WorkerReschedulePlan> for PbWorkerReschedule {
52 fn from(value: WorkerReschedulePlan) -> Self {
53 let WorkerReschedulePlan { actor_count_diff } = value;
54
55 PbWorkerReschedule {
56 worker_actor_diff: actor_count_diff,
57 }
58 }
59}
60
61impl From<PbWorkerReschedule> for WorkerReschedulePlan {
62 fn from(value: PbWorkerReschedule) -> Self {
63 let PbWorkerReschedule {
64 worker_actor_diff: actor_count_diff,
65 } = value;
66
67 WorkerReschedulePlan {
68 actor_count_diff: actor_count_diff
69 .into_iter()
70 .map(|(k, v)| (k, v as _))
71 .collect(),
72 }
73 }
74}
75
76pub async fn reschedule(
77 context: &CtlContext,
78 plan: Option<String>,
79 revision: Option<u64>,
80 from: Option<String>,
81 dry_run: bool,
82 resolve_no_shuffle: bool,
83) -> Result<()> {
84 let meta_client = context.meta_client().await?;
85
86 let (reschedules, revision) = match (plan, revision, from) {
87 (Some(plan), Some(revision), None) => (parse_plan(plan)?, revision),
88 (None, None, Some(path)) => {
89 let file = std::fs::File::open(path)?;
90 let ReschedulePayload {
91 reschedule_revision,
92 worker_reschedule_plan,
93 } = serde_yaml::from_reader(file)?;
94 (
95 worker_reschedule_plan
96 .into_iter()
97 .map(|(fragment_id, worker_reschedule_plan)| {
98 (fragment_id, worker_reschedule_plan.into())
99 })
100 .collect(),
101 reschedule_revision,
102 )
103 }
104 _ => unreachable!(),
105 };
106
107 if reschedules.is_empty() {
108 return Ok(());
109 }
110
111 for (fragment_id, reschedule) in &reschedules {
112 println!("For fragment #{}", fragment_id);
113 if !reschedule.get_worker_actor_diff().is_empty() {
114 println!("\tChange: {:?}", reschedule.get_worker_actor_diff());
115 }
116
117 println!();
118 }
119
120 if !dry_run {
121 println!("---------------------------");
122 let (success, revision) = meta_client
123 .reschedule(reschedules, revision, resolve_no_shuffle)
124 .await?;
125
126 if !success {
127 println!(
128 "Reschedule failed, please check the plan or the revision, current revision is {}",
129 revision
130 );
131
132 return Err(anyhow!("reschedule failed"));
133 }
134
135 println!("Reschedule success, current revision is {}", revision);
136 }
137
138 Ok(())
139}
140
141fn parse_plan(mut plan: String) -> Result<HashMap<u32, PbWorkerReschedule>> {
143 let mut reschedules = HashMap::new();
144 let regex = Regex::new(r"^(\d+):\[((?:\d+:[+-]?\d+,?)+)]$")?;
145 plan.retain(|c| !c.is_whitespace());
146
147 for fragment_reschedule_plan in plan.split(';') {
148 if fragment_reschedule_plan.is_empty() {
149 continue;
150 }
151
152 let captures = regex
153 .captures(fragment_reschedule_plan)
154 .ok_or_else(|| anyhow!("plan \"{}\" format illegal", fragment_reschedule_plan))?;
155
156 let fragment_id = captures
157 .get(1)
158 .and_then(|mat| mat.as_str().parse::<u32>().ok())
159 .ok_or_else(|| anyhow!("plan \"{}\" does not have a valid fragment id", plan))?;
160
161 let worker_changes: Vec<&str> = captures[2].split(',').collect();
162
163 let mut worker_actor_diff = HashMap::new();
164 for worker_change in &worker_changes {
165 let (worker_id, count) = worker_change.split(':').collect_tuple::<(_, _)>().unwrap();
166 let worker_id = worker_id.parse().unwrap();
167 let count = count.parse().unwrap();
168
169 if let Some(dup_change) = worker_actor_diff.insert(worker_id, count) {
170 anyhow::bail!(
171 "duplicate worker id {worker_id} in plan, prev {worker_id} -> {dup_change}",
172 );
173 }
174 }
175
176 if !worker_actor_diff.is_empty() {
177 reschedules.insert(fragment_id, PbWorkerReschedule { worker_actor_diff });
178 }
179 }
180 Ok(reschedules)
181}
182
183pub async fn unregister_workers(
184 context: &CtlContext,
185 workers: Vec<String>,
186 yes: bool,
187 ignore_not_found: bool,
188 check_fragment_occupied: bool,
189) -> Result<()> {
190 let meta_client = context.meta_client().await?;
191
192 let GetClusterInfoResponse {
193 worker_nodes,
194 table_fragments: all_table_fragments,
195 ..
196 } = match meta_client.get_cluster_info().await {
197 Ok(info) => info,
198 Err(e) => {
199 println!("Failed to get cluster info: {}", e.as_report());
200 exit(1);
201 }
202 };
203
204 let worker_index_by_host: HashMap<_, _> = worker_nodes
205 .iter()
206 .map(|worker| {
207 let host = worker.get_host().expect("host should not be empty");
208 (format!("{}:{}", host.host, host.port), worker.id)
209 })
210 .collect();
211
212 let mut target_worker_ids: HashSet<_> = HashSet::new();
213
214 let worker_ids: HashSet<_> = worker_nodes.iter().map(|worker| worker.id).collect();
215
216 for worker in workers {
217 let worker_id = worker
218 .parse::<WorkerId>()
219 .ok()
220 .or_else(|| worker_index_by_host.get(&worker).cloned());
221
222 if let Some(worker_id) = worker_id
223 && worker_ids.contains(&worker_id)
224 {
225 if !target_worker_ids.insert(worker_id) {
226 println!("Warn: {} and {} are the same worker", worker, worker_id);
227 }
228 } else {
229 if ignore_not_found {
230 println!("Warn: worker {} not found, ignored", worker);
231 continue;
232 }
233
234 println!("Could not find worker {}", worker);
235 exit(1);
236 }
237 }
238
239 if target_worker_ids.is_empty() {
240 if ignore_not_found {
241 println!("Warn: No worker provided, ignored");
242 return Ok(());
243 }
244 println!("No worker provided");
245 exit(1);
246 }
247
248 let target_workers = worker_nodes
249 .into_iter()
250 .filter(|worker| target_worker_ids.contains(&worker.id))
251 .collect_vec();
252
253 for table_fragments in &all_table_fragments {
254 for (fragment_id, fragment) in &table_fragments.fragments {
255 let occupied_worker_ids: HashSet<_> = fragment
256 .actors
257 .iter()
258 .map(|actor| {
259 table_fragments
260 .actor_status
261 .get(&actor.actor_id)
262 .map(|actor_status| actor_status.worker_id())
263 .unwrap()
264 })
265 .collect();
266
267 let intersection_worker_ids: HashSet<_> = occupied_worker_ids
268 .intersection(&target_worker_ids)
269 .collect();
270
271 if check_fragment_occupied && !intersection_worker_ids.is_empty() {
272 println!(
273 "worker ids {:?} are still occupied by fragment #{}",
274 intersection_worker_ids, fragment_id
275 );
276 exit(1);
277 }
278 }
279 }
280
281 if !yes {
282 match Confirm::new("Will perform actions on the cluster, are you sure?")
283 .with_default(false)
284 .with_help_message("Use the --yes or -y option to skip this prompt")
285 .with_placeholder("no")
286 .prompt()
287 {
288 Ok(true) => println!("Processing..."),
289 Ok(false) => {
290 println!("Abort.");
291 exit(1);
292 }
293 Err(_) => {
294 println!("Error with questionnaire, try again later");
295 exit(-1);
296 }
297 }
298 }
299
300 for WorkerNode { id, host, .. } in target_workers {
301 let host = match host {
302 None => {
303 println!("Worker #{} does not have a host, skipping", id);
304 continue;
305 }
306 Some(host) => host,
307 };
308
309 println!("Unregistering worker #{}, address: {:?}", id, host);
310 if let Err(e) = meta_client.delete_worker_node(host).await {
311 println!("Failed to delete worker #{}: {}", id, e.as_report());
312 };
313 }
314
315 println!("Done");
316
317 Ok(())
318}