risingwave_ctl/cmd_impl/meta/
reschedule.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::process::exit;
17
18use anyhow::{Result, anyhow};
19use inquire::Confirm;
20use itertools::Itertools;
21use regex::Regex;
22use risingwave_meta_model::WorkerId;
23use risingwave_pb::common::WorkerNode;
24use risingwave_pb::meta::{GetClusterInfoResponse, PbWorkerReschedule};
25use serde::{Deserialize, Serialize};
26use thiserror_ext::AsReport;
27
28use crate::CtlContext;
29
30#[derive(Serialize, Deserialize, Debug)]
31pub struct ReschedulePayload {
32    #[serde(rename = "reschedule_revision")]
33    pub reschedule_revision: u64,
34
35    #[serde(rename = "reschedule_plan")]
36    pub worker_reschedule_plan: HashMap<u32, WorkerReschedulePlan>,
37}
38
39#[derive(Serialize, Deserialize, Debug)]
40pub struct WorkerReschedulePlan {
41    #[serde(rename = "actor_count_diff")]
42    pub actor_count_diff: HashMap<WorkerId, i32>,
43}
44
45#[derive(Debug)]
46pub enum RescheduleInput {
47    String(String),
48    FilePath(String),
49}
50
51impl From<WorkerReschedulePlan> for PbWorkerReschedule {
52    fn from(value: WorkerReschedulePlan) -> Self {
53        let WorkerReschedulePlan { actor_count_diff } = value;
54
55        PbWorkerReschedule {
56            worker_actor_diff: actor_count_diff,
57        }
58    }
59}
60
61impl From<PbWorkerReschedule> for WorkerReschedulePlan {
62    fn from(value: PbWorkerReschedule) -> Self {
63        let PbWorkerReschedule {
64            worker_actor_diff: actor_count_diff,
65        } = value;
66
67        WorkerReschedulePlan {
68            actor_count_diff: actor_count_diff
69                .into_iter()
70                .map(|(k, v)| (k, v as _))
71                .collect(),
72        }
73    }
74}
75
76pub async fn reschedule(
77    context: &CtlContext,
78    plan: Option<String>,
79    revision: Option<u64>,
80    from: Option<String>,
81    dry_run: bool,
82    resolve_no_shuffle: bool,
83) -> Result<()> {
84    let meta_client = context.meta_client().await?;
85
86    let (reschedules, revision) = match (plan, revision, from) {
87        (Some(plan), Some(revision), None) => (parse_plan(plan)?, revision),
88        (None, None, Some(path)) => {
89            let file = std::fs::File::open(path)?;
90            let ReschedulePayload {
91                reschedule_revision,
92                worker_reschedule_plan,
93            } = serde_yaml::from_reader(file)?;
94            (
95                worker_reschedule_plan
96                    .into_iter()
97                    .map(|(fragment_id, worker_reschedule_plan)| {
98                        (fragment_id, worker_reschedule_plan.into())
99                    })
100                    .collect(),
101                reschedule_revision,
102            )
103        }
104        _ => unreachable!(),
105    };
106
107    if reschedules.is_empty() {
108        return Ok(());
109    }
110
111    for (fragment_id, reschedule) in &reschedules {
112        println!("For fragment #{}", fragment_id);
113        if !reschedule.get_worker_actor_diff().is_empty() {
114            println!("\tChange: {:?}", reschedule.get_worker_actor_diff());
115        }
116
117        println!();
118    }
119
120    if !dry_run {
121        println!("---------------------------");
122        let (success, revision) = meta_client
123            .reschedule(reschedules, revision, resolve_no_shuffle)
124            .await?;
125
126        if !success {
127            println!(
128                "Reschedule failed, please check the plan or the revision, current revision is {}",
129                revision
130            );
131
132            return Err(anyhow!("reschedule failed"));
133        }
134
135        println!("Reschedule success, current revision is {}", revision);
136    }
137
138    Ok(())
139}
140
141// It will match formats like `1:[1:+1,2:-1,3:1];2:[1:1,2:1]`, indicating which workers' actors need to change in quantity for each fragment.
142fn parse_plan(mut plan: String) -> Result<HashMap<u32, PbWorkerReschedule>> {
143    let mut reschedules = HashMap::new();
144    let regex = Regex::new(r"^(\d+):\[((?:\d+:[+-]?\d+,?)+)]$")?;
145    plan.retain(|c| !c.is_whitespace());
146
147    for fragment_reschedule_plan in plan.split(';') {
148        if fragment_reschedule_plan.is_empty() {
149            continue;
150        }
151
152        let captures = regex
153            .captures(fragment_reschedule_plan)
154            .ok_or_else(|| anyhow!("plan \"{}\" format illegal", fragment_reschedule_plan))?;
155
156        let fragment_id = captures
157            .get(1)
158            .and_then(|mat| mat.as_str().parse::<u32>().ok())
159            .ok_or_else(|| anyhow!("plan \"{}\" does not have a valid fragment id", plan))?;
160
161        let worker_changes: Vec<&str> = captures[2].split(',').collect();
162
163        let mut worker_actor_diff = HashMap::new();
164        for worker_change in &worker_changes {
165            let (worker_id, count) = worker_change.split(':').collect_tuple::<(_, _)>().unwrap();
166            let worker_id = worker_id.parse().unwrap();
167            let count = count.parse().unwrap();
168
169            if let Some(dup_change) = worker_actor_diff.insert(worker_id, count) {
170                anyhow::bail!(
171                    "duplicate worker id {worker_id} in plan, prev {worker_id} -> {dup_change}",
172                );
173            }
174        }
175
176        if !worker_actor_diff.is_empty() {
177            reschedules.insert(fragment_id, PbWorkerReschedule { worker_actor_diff });
178        }
179    }
180    Ok(reschedules)
181}
182
183pub async fn unregister_workers(
184    context: &CtlContext,
185    workers: Vec<String>,
186    yes: bool,
187    ignore_not_found: bool,
188    check_fragment_occupied: bool,
189) -> Result<()> {
190    let meta_client = context.meta_client().await?;
191
192    let GetClusterInfoResponse {
193        worker_nodes,
194        table_fragments: all_table_fragments,
195        ..
196    } = match meta_client.get_cluster_info().await {
197        Ok(info) => info,
198        Err(e) => {
199            println!("Failed to get cluster info: {}", e.as_report());
200            exit(1);
201        }
202    };
203
204    let worker_index_by_host: HashMap<_, _> = worker_nodes
205        .iter()
206        .map(|worker| {
207            let host = worker.get_host().expect("host should not be empty");
208            (format!("{}:{}", host.host, host.port), worker.id)
209        })
210        .collect();
211
212    let mut target_worker_ids: HashSet<_> = HashSet::new();
213
214    let worker_ids: HashSet<_> = worker_nodes.iter().map(|worker| worker.id).collect();
215
216    for worker in workers {
217        let worker_id = worker
218            .parse::<WorkerId>()
219            .ok()
220            .or_else(|| worker_index_by_host.get(&worker).cloned());
221
222        if let Some(worker_id) = worker_id
223            && worker_ids.contains(&worker_id)
224        {
225            if !target_worker_ids.insert(worker_id) {
226                println!("Warn: {} and {} are the same worker", worker, worker_id);
227            }
228        } else {
229            if ignore_not_found {
230                println!("Warn: worker {} not found, ignored", worker);
231                continue;
232            }
233
234            println!("Could not find worker {}", worker);
235            exit(1);
236        }
237    }
238
239    if target_worker_ids.is_empty() {
240        if ignore_not_found {
241            println!("Warn: No worker provided, ignored");
242            return Ok(());
243        }
244        println!("No worker provided");
245        exit(1);
246    }
247
248    let target_workers = worker_nodes
249        .into_iter()
250        .filter(|worker| target_worker_ids.contains(&worker.id))
251        .collect_vec();
252
253    for table_fragments in &all_table_fragments {
254        for (fragment_id, fragment) in &table_fragments.fragments {
255            let occupied_worker_ids: HashSet<_> = fragment
256                .actors
257                .iter()
258                .map(|actor| {
259                    table_fragments
260                        .actor_status
261                        .get(&actor.actor_id)
262                        .map(|actor_status| actor_status.worker_id())
263                        .unwrap()
264                })
265                .collect();
266
267            let intersection_worker_ids: HashSet<_> = occupied_worker_ids
268                .intersection(&target_worker_ids)
269                .collect();
270
271            if check_fragment_occupied && !intersection_worker_ids.is_empty() {
272                println!(
273                    "worker ids {:?} are still occupied by fragment #{}",
274                    intersection_worker_ids, fragment_id
275                );
276                exit(1);
277            }
278        }
279    }
280
281    if !yes {
282        match Confirm::new("Will perform actions on the cluster, are you sure?")
283            .with_default(false)
284            .with_help_message("Use the --yes or -y option to skip this prompt")
285            .with_placeholder("no")
286            .prompt()
287        {
288            Ok(true) => println!("Processing..."),
289            Ok(false) => {
290                println!("Abort.");
291                exit(1);
292            }
293            Err(_) => {
294                println!("Error with questionnaire, try again later");
295                exit(-1);
296            }
297        }
298    }
299
300    for WorkerNode { id, host, .. } in target_workers {
301        let host = match host {
302            None => {
303                println!("Worker #{} does not have a host, skipping", id);
304                continue;
305            }
306            Some(host) => host,
307        };
308
309        println!("Unregistering worker #{}, address: {:?}", id, host);
310        if let Err(e) = meta_client.delete_worker_node(host).await {
311            println!("Failed to delete worker #{}: {}", id, e.as_report());
312        };
313    }
314
315    println!("Done");
316
317    Ok(())
318}