risedev/task/
task_log_ready_check.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io::{Read as _, Seek as _, SeekFrom};
16use std::time::Duration;
17
18use anyhow::{Context, Result, bail};
19use fs_err::File;
20
21use super::{ExecuteContext, Task};
22use crate::wait::wait;
23
24/// Check if all log patterns are found in the log output indicating the service is ready.
25pub struct LogReadyCheckTask {
26    patterns: Vec<String>,
27}
28
29impl LogReadyCheckTask {
30    pub fn new(pattern: impl Into<String>) -> Result<Self> {
31        Ok(Self {
32            patterns: vec![pattern.into()],
33        })
34    }
35
36    pub fn new_all(patterns: impl IntoIterator<Item = impl Into<String>>) -> Result<Self> {
37        Ok(Self {
38            patterns: patterns.into_iter().map(Into::into).collect(),
39        })
40    }
41}
42
43impl Task for LogReadyCheckTask {
44    fn execute(&mut self, ctx: &mut ExecuteContext<impl std::io::Write>) -> anyhow::Result<()> {
45        let Some(id) = ctx.id.clone() else {
46            panic!("Service should be set before executing LogReadyCheckTask");
47        };
48
49        ctx.pb.set_message("waiting for ready...");
50        ctx.wait_log_contains(&self.patterns)
51            .with_context(|| format!("failed to wait for service `{id}` to be ready"))?;
52
53        ctx.complete_spin();
54
55        Ok(())
56    }
57}
58
59impl<W> ExecuteContext<W>
60where
61    W: std::io::Write,
62{
63    fn wait_log_contains(&mut self, patterns: &[String]) -> anyhow::Result<()> {
64        let log_path = self.log_path().to_path_buf();
65
66        let mut content = String::new();
67        let mut offset = 0;
68
69        wait(
70            || {
71                let mut file = File::open(&log_path).context("log file does not exist")?;
72                file.seek(SeekFrom::Start(offset as u64))?;
73                offset += file.read_to_string(&mut content)?;
74
75                // Always going through the whole log file could be stupid, but it's reliable.
76                for pattern in patterns {
77                    if !content.contains(pattern) {
78                        bail!("pattern \"{}\" not found in log", pattern)
79                    }
80                }
81
82                Ok(())
83            },
84            &mut self.log,
85            self.status_file.as_ref().unwrap(),
86            self.id.as_ref().unwrap(),
87            Some(Duration::from_secs(60)),
88            true,
89        )?;
90
91        Ok(())
92    }
93}