risedev/task/ensure_stop_service.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Result;
16
17use super::{ExecuteContext, Task};
18
19pub struct EnsureStopService {
20 /// `(port, id, user_managed)`
21 ports: Vec<(u16, String, bool)>,
22}
23
24impl EnsureStopService {
25 pub fn new(ports: Vec<(u16, String, bool)>) -> Result<Self> {
26 Ok(Self { ports })
27 }
28}
29
30impl Task for EnsureStopService {
31 fn execute(&mut self, ctx: &mut ExecuteContext<impl std::io::Write>) -> anyhow::Result<()> {
32 ctx.service(self);
33
34 for (port, service_id, user_managed) in &self.ports {
35 // Do not require stopping user-managed services
36 if *user_managed {
37 continue;
38 }
39 let address = format!("127.0.0.1:{}", port);
40
41 ctx.pb.set_message(format!(
42 "waiting for port close - {} (will be used by {})",
43 address, service_id
44 ));
45 ctx.wait_tcp_close(&address)?;
46 }
47
48 ctx.pb
49 .set_message("all previous services have been stopped");
50
51 ctx.complete_spin();
52
53 Ok(())
54 }
55
56 fn id(&self) -> String {
57 "prepare".into()
58 }
59}