risedev/task/
ensure_stop_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Result;
16
17use super::{ExecuteContext, Task};
18
19pub struct EnsureStopService {
20    /// `(port, id, user_managed)`
21    ports: Vec<(u16, String, bool)>,
22}
23
24impl EnsureStopService {
25    pub fn new(ports: Vec<(u16, String, bool)>) -> Result<Self> {
26        Ok(Self { ports })
27    }
28}
29
30impl Task for EnsureStopService {
31    fn execute(&mut self, ctx: &mut ExecuteContext<impl std::io::Write>) -> anyhow::Result<()> {
32        ctx.service(self);
33
34        for (port, service_id, user_managed) in &self.ports {
35            // Do not require stopping user-managed services
36            if *user_managed {
37                continue;
38            }
39            let address = format!("127.0.0.1:{}", port);
40
41            ctx.pb.set_message(format!(
42                "waiting for port close - {} (will be used by {})",
43                address, service_id
44            ));
45            ctx.wait_tcp_close(&address)?;
46        }
47
48        ctx.pb
49            .set_message("all previous services have been stopped");
50
51        ctx.complete_spin();
52
53        Ok(())
54    }
55
56    fn id(&self) -> String {
57        "prepare".into()
58    }
59}