risedev/task/
tempo_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::env;
16use std::path::{Path, PathBuf};
17use std::process::Command;
18
19use anyhow::{Result, anyhow};
20
21use super::{ExecuteContext, Task};
22use crate::util::stylized_risedev_subcmd;
23use crate::{TempoConfig, TempoGen};
24
25pub struct TempoService {
26    config: TempoConfig,
27}
28
29impl TempoService {
30    pub fn new(config: TempoConfig) -> Result<Self> {
31        Ok(Self { config })
32    }
33
34    fn tempo_path(&self) -> Result<PathBuf> {
35        let prefix_bin = env::var("PREFIX_BIN")?;
36        Ok(Path::new(&prefix_bin).join("tempo").join("tempo"))
37    }
38
39    fn tempo(&self) -> Result<Command> {
40        Ok(Command::new(self.tempo_path()?))
41    }
42
43    pub fn apply_command_args(
44        cmd: &mut Command,
45        data_path: impl AsRef<Path>,
46        config_file_path: impl AsRef<Path>,
47    ) -> Result<()> {
48        cmd.arg("--storage.trace.backend")
49            .arg("local")
50            .arg("--storage.trace.local.path")
51            .arg(data_path.as_ref().join("blocks"))
52            .arg("--storage.trace.wal.path")
53            .arg(data_path.as_ref().join("wal"))
54            .arg("--config.file")
55            .arg(config_file_path.as_ref());
56        Ok(())
57    }
58}
59
60impl Task for TempoService {
61    fn execute(&mut self, ctx: &mut ExecuteContext<impl std::io::Write>) -> anyhow::Result<()> {
62        ctx.service(self);
63        ctx.pb.set_message("starting...");
64
65        let path = self.tempo_path()?;
66        if !path.exists() {
67            return Err(anyhow!(
68                "tempo binary not found in {:?}\nDid you enable tracing feature in `{}`?",
69                path,
70                stylized_risedev_subcmd("configure")
71            ));
72        }
73
74        let prefix_config = env::var("PREFIX_CONFIG")?;
75        let prefix_data = env::var("PREFIX_DATA")?;
76
77        let config_file_path = Path::new(&prefix_config).join("tempo.yml");
78        fs_err::write(&config_file_path, TempoGen.gen_tempo_yml(&self.config))?;
79
80        let data_path = Path::new(&prefix_data).join(self.id());
81        fs_err::create_dir_all(&data_path)?;
82
83        let mut cmd = self.tempo()?;
84        Self::apply_command_args(&mut cmd, &data_path, &config_file_path)?;
85
86        ctx.run_command(ctx.tmux_run(cmd)?)?;
87
88        ctx.pb.set_message("started");
89
90        Ok(())
91    }
92
93    fn id(&self) -> String {
94        self.config.id.clone()
95    }
96}