risingwave_ctl/cmd_impl/meta/
reschedule.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::process::exit;
17
18use anyhow::{Result, anyhow};
19use inquire::Confirm;
20use itertools::Itertools;
21use regex::Regex;
22use risingwave_meta_model::WorkerId;
23use risingwave_pb::common::WorkerNode;
24use risingwave_pb::meta::{GetClusterInfoResponse, PbWorkerReschedule};
25use serde::{Deserialize, Serialize};
26use thiserror_ext::AsReport;
27
28use crate::CtlContext;
29
30#[derive(Serialize, Deserialize, Debug)]
31pub struct ReschedulePayload {
32    #[serde(rename = "reschedule_revision")]
33    pub reschedule_revision: u64,
34
35    #[serde(rename = "reschedule_plan")]
36    pub worker_reschedule_plan: HashMap<u32, WorkerReschedulePlan>,
37}
38
39#[derive(Serialize, Deserialize, Debug)]
40pub struct WorkerReschedulePlan {
41    #[serde(rename = "actor_count_diff")]
42    pub actor_count_diff: HashMap<WorkerId, i32>,
43}
44
45#[derive(Debug)]
46pub enum RescheduleInput {
47    String(String),
48    FilePath(String),
49}
50
51impl From<WorkerReschedulePlan> for PbWorkerReschedule {
52    fn from(value: WorkerReschedulePlan) -> Self {
53        let WorkerReschedulePlan { actor_count_diff } = value;
54
55        PbWorkerReschedule {
56            worker_actor_diff: actor_count_diff
57                .into_iter()
58                .map(|(k, v)| (k as _, v as _))
59                .collect(),
60        }
61    }
62}
63
64impl From<PbWorkerReschedule> for WorkerReschedulePlan {
65    fn from(value: PbWorkerReschedule) -> Self {
66        let PbWorkerReschedule {
67            worker_actor_diff: actor_count_diff,
68        } = value;
69
70        WorkerReschedulePlan {
71            actor_count_diff: actor_count_diff
72                .into_iter()
73                .map(|(k, v)| (k as _, v as _))
74                .collect(),
75        }
76    }
77}
78
79pub async fn reschedule(
80    context: &CtlContext,
81    plan: Option<String>,
82    revision: Option<u64>,
83    from: Option<String>,
84    dry_run: bool,
85    resolve_no_shuffle: bool,
86) -> Result<()> {
87    let meta_client = context.meta_client().await?;
88
89    let (reschedules, revision) = match (plan, revision, from) {
90        (Some(plan), Some(revision), None) => (parse_plan(plan)?, revision),
91        (None, None, Some(path)) => {
92            let file = std::fs::File::open(path)?;
93            let ReschedulePayload {
94                reschedule_revision,
95                worker_reschedule_plan,
96            } = serde_yaml::from_reader(file)?;
97            (
98                worker_reschedule_plan
99                    .into_iter()
100                    .map(|(fragment_id, worker_reschedule_plan)| {
101                        (fragment_id, worker_reschedule_plan.into())
102                    })
103                    .collect(),
104                reschedule_revision,
105            )
106        }
107        _ => unreachable!(),
108    };
109
110    if reschedules.is_empty() {
111        return Ok(());
112    }
113
114    for (fragment_id, reschedule) in &reschedules {
115        println!("For fragment #{}", fragment_id);
116        if !reschedule.get_worker_actor_diff().is_empty() {
117            println!("\tChange: {:?}", reschedule.get_worker_actor_diff());
118        }
119
120        println!();
121    }
122
123    if !dry_run {
124        println!("---------------------------");
125        let (success, revision) = meta_client
126            .reschedule(reschedules, revision, resolve_no_shuffle)
127            .await?;
128
129        if !success {
130            println!(
131                "Reschedule failed, please check the plan or the revision, current revision is {}",
132                revision
133            );
134
135            return Err(anyhow!("reschedule failed"));
136        }
137
138        println!("Reschedule success, current revision is {}", revision);
139    }
140
141    Ok(())
142}
143
144// It will match formats like `1:[1:+1,2:-1,3:1];2:[1:1,2:1]`, indicating which workers' actors need to change in quantity for each fragment.
145fn parse_plan(mut plan: String) -> Result<HashMap<u32, PbWorkerReschedule>> {
146    let mut reschedules = HashMap::new();
147    let regex = Regex::new(r"^(\d+):\[((?:\d+:[+-]?\d+,?)+)]$")?;
148    plan.retain(|c| !c.is_whitespace());
149
150    for fragment_reschedule_plan in plan.split(';') {
151        if fragment_reschedule_plan.is_empty() {
152            continue;
153        }
154
155        let captures = regex
156            .captures(fragment_reschedule_plan)
157            .ok_or_else(|| anyhow!("plan \"{}\" format illegal", fragment_reschedule_plan))?;
158
159        let fragment_id = captures
160            .get(1)
161            .and_then(|mat| mat.as_str().parse::<u32>().ok())
162            .ok_or_else(|| anyhow!("plan \"{}\" does not have a valid fragment id", plan))?;
163
164        let worker_changes: Vec<&str> = captures[2].split(',').collect();
165
166        let mut worker_actor_diff = HashMap::new();
167        for worker_change in &worker_changes {
168            let (worker_id, count) = worker_change
169                .split(':')
170                .map(|v| v.parse::<i32>().unwrap())
171                .collect_tuple::<(_, _)>()
172                .unwrap();
173
174            if let Some(dup_change) = worker_actor_diff.insert(worker_id as u32, count) {
175                anyhow::bail!(
176                    "duplicate worker id {worker_id} in plan, prev {worker_id} -> {dup_change}",
177                );
178            }
179        }
180
181        if !worker_actor_diff.is_empty() {
182            reschedules.insert(fragment_id, PbWorkerReschedule { worker_actor_diff });
183        }
184    }
185    Ok(reschedules)
186}
187
188pub async fn unregister_workers(
189    context: &CtlContext,
190    workers: Vec<String>,
191    yes: bool,
192    ignore_not_found: bool,
193    check_fragment_occupied: bool,
194) -> Result<()> {
195    let meta_client = context.meta_client().await?;
196
197    let GetClusterInfoResponse {
198        worker_nodes,
199        table_fragments: all_table_fragments,
200        ..
201    } = match meta_client.get_cluster_info().await {
202        Ok(info) => info,
203        Err(e) => {
204            println!("Failed to get cluster info: {}", e.as_report());
205            exit(1);
206        }
207    };
208
209    let worker_index_by_host: HashMap<_, _> = worker_nodes
210        .iter()
211        .map(|worker| {
212            let host = worker.get_host().expect("host should not be empty");
213            (format!("{}:{}", host.host, host.port), worker.id)
214        })
215        .collect();
216
217    let mut target_worker_ids: HashSet<_> = HashSet::new();
218
219    let worker_ids: HashSet<_> = worker_nodes.iter().map(|worker| worker.id).collect();
220
221    for worker in workers {
222        let worker_id = worker
223            .parse::<u32>()
224            .ok()
225            .or_else(|| worker_index_by_host.get(&worker).cloned());
226
227        if let Some(worker_id) = worker_id
228            && worker_ids.contains(&worker_id)
229        {
230            if !target_worker_ids.insert(worker_id) {
231                println!("Warn: {} and {} are the same worker", worker, worker_id);
232            }
233        } else {
234            if ignore_not_found {
235                println!("Warn: worker {} not found, ignored", worker);
236                continue;
237            }
238
239            println!("Could not find worker {}", worker);
240            exit(1);
241        }
242    }
243
244    if target_worker_ids.is_empty() {
245        if ignore_not_found {
246            println!("Warn: No worker provided, ignored");
247            return Ok(());
248        }
249        println!("No worker provided");
250        exit(1);
251    }
252
253    let target_workers = worker_nodes
254        .into_iter()
255        .filter(|worker| target_worker_ids.contains(&worker.id))
256        .collect_vec();
257
258    for table_fragments in &all_table_fragments {
259        for (fragment_id, fragment) in &table_fragments.fragments {
260            let occupied_worker_ids: HashSet<_> = fragment
261                .actors
262                .iter()
263                .map(|actor| {
264                    table_fragments
265                        .actor_status
266                        .get(&actor.actor_id)
267                        .map(|actor_status| actor_status.worker_id())
268                        .unwrap()
269                })
270                .collect();
271
272            let intersection_worker_ids: HashSet<_> = occupied_worker_ids
273                .intersection(&target_worker_ids)
274                .collect();
275
276            if check_fragment_occupied && !intersection_worker_ids.is_empty() {
277                println!(
278                    "worker ids {:?} are still occupied by fragment #{}",
279                    intersection_worker_ids, fragment_id
280                );
281                exit(1);
282            }
283        }
284    }
285
286    if !yes {
287        match Confirm::new("Will perform actions on the cluster, are you sure?")
288            .with_default(false)
289            .with_help_message("Use the --yes or -y option to skip this prompt")
290            .with_placeholder("no")
291            .prompt()
292        {
293            Ok(true) => println!("Processing..."),
294            Ok(false) => {
295                println!("Abort.");
296                exit(1);
297            }
298            Err(_) => {
299                println!("Error with questionnaire, try again later");
300                exit(-1);
301            }
302        }
303    }
304
305    for WorkerNode { id, host, .. } in target_workers {
306        let host = match host {
307            None => {
308                println!("Worker #{} does not have a host, skipping", id);
309                continue;
310            }
311            Some(host) => host,
312        };
313
314        println!("Unregistering worker #{}, address: {:?}", id, host);
315        if let Err(e) = meta_client.delete_worker_node(host).await {
316            println!("Failed to delete worker #{}: {}", id, e.as_report());
317        };
318    }
319
320    println!("Done");
321
322    Ok(())
323}