risedev/
wait.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io::Read;
16use std::net::{TcpStream, ToSocketAddrs};
17use std::path::Path;
18use std::thread::sleep;
19use std::time::Duration;
20
21use anyhow::{Context as _, Result, anyhow};
22use console::style;
23
24pub fn wait(
25    mut wait_func: impl FnMut() -> Result<()>,
26    f: &mut impl std::io::Write,
27    p: impl AsRef<Path>,
28    id: &str,
29    timeout: Option<std::time::Duration>,
30    detect_failure: bool,
31) -> anyhow::Result<()> {
32    let p = p.as_ref();
33    let start_time = std::time::Instant::now();
34
35    writeln!(f, "Waiting for online")?;
36
37    let mut last_error;
38
39    loop {
40        match wait_func() {
41            Ok(_) => {
42                return Ok(());
43            }
44            Err(err) => {
45                last_error = Some(err);
46            }
47        }
48
49        if let Some(ref timeout) = timeout {
50            if std::time::Instant::now() - start_time >= *timeout {
51                let context = "timeout when trying to connect";
52
53                return Err(if let Some(last_error) = last_error {
54                    last_error.context(context)
55                } else {
56                    anyhow!(context)
57                });
58            }
59        }
60
61        if detect_failure && p.exists() {
62            let mut buf = String::new();
63            fs_err::File::open(p)?.read_to_string(&mut buf)?;
64
65            let context = format!(
66                "{} exited while waiting for connection: {}",
67                style(id).red().bold(),
68                buf.trim(),
69            );
70
71            return Err(if let Some(last_error) = last_error {
72                last_error.context(context)
73            } else {
74                anyhow!(context)
75            });
76        }
77
78        sleep(Duration::from_millis(100));
79    }
80}
81
82pub fn wait_tcp_available(
83    server: impl AsRef<str>,
84    timeout: Option<std::time::Duration>,
85) -> anyhow::Result<()> {
86    let server = server.as_ref();
87    let addr = server
88        .to_socket_addrs()?
89        .next()
90        .with_context(|| format!("failed to resolve {}", server))?;
91    let start_time = std::time::Instant::now();
92
93    loop {
94        match TcpStream::connect_timeout(&addr, Duration::from_secs(1)) {
95            Ok(_) => {}
96            Err(_) => {
97                return Ok(());
98            }
99        }
100
101        if let Some(ref timeout) = timeout {
102            if std::time::Instant::now() - start_time >= *timeout {
103                return Err(anyhow!(
104                    "Failed to wait for port closing on {}. The port may still be in use by another process or application. Please ensure the port is not being used elsewhere and try again.",
105                    server
106                ));
107            }
108        }
109
110        sleep(Duration::from_millis(100));
111    }
112}