risedev/task/
task_db_ready_check.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use anyhow::Context as _;
18use sqlx::{ConnectOptions, Connection as _};
19
20use super::{ExecuteContext, Task};
21use crate::wait::wait;
22
23/// Check if the database is ready to use.
24pub struct DbReadyCheckTask<O> {
25    options: O,
26}
27
28impl<O> DbReadyCheckTask<O> {
29    pub fn new(options: O) -> Self {
30        Self { options }
31    }
32}
33
34impl<O> Task for DbReadyCheckTask<O>
35where
36    O: ConnectOptions,
37    O::Connection: Sized,
38{
39    fn execute(&mut self, ctx: &mut ExecuteContext<impl std::io::Write>) -> anyhow::Result<()> {
40        let Some(id) = ctx.id.clone() else {
41            panic!("Service should be set before executing DbReadyCheckTask");
42        };
43
44        ctx.pb.set_message("waiting for ready...");
45
46        wait(
47            || {
48                let options = self.options.clone();
49
50                let rt = tokio::runtime::Builder::new_current_thread()
51                    .enable_all()
52                    .build()?;
53
54                rt.block_on(async move {
55                    let mut conn = options
56                        .connect()
57                        .await
58                        .context("failed to connect to database")?;
59                    conn.ping().await.context("failed to ping database")?;
60                    Ok(())
61                })
62            },
63            &mut ctx.log,
64            ctx.status_file.as_ref().unwrap(),
65            &id,
66            Some(Duration::from_secs(20)),
67            true,
68        )
69        .with_context(|| format!("failed to wait for service `{id}` to be ready"))?;
70
71        ctx.complete_spin();
72
73        Ok(())
74    }
75}