risedev/task/
moat_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::env;
16use std::path::{Path, PathBuf};
17use std::process::Command;
18
19use anyhow::{Result, anyhow};
20
21use super::{ExecuteContext, Task};
22use crate::MoatConfig;
23use crate::util::stylized_risedev_subcmd;
24
25#[derive(Debug)]
26pub struct MoatService {
27    config: MoatConfig,
28}
29
30impl MoatService {
31    pub fn new(config: MoatConfig) -> Result<Self> {
32        Ok(Self { config })
33    }
34
35    fn moat_path(&self) -> Result<PathBuf> {
36        let prefix_bin = env::var("PREFIX_BIN")?;
37        Ok(Path::new(&prefix_bin).join("moat"))
38    }
39
40    fn moat(&self) -> Result<Command> {
41        Ok(Command::new(self.moat_path()?))
42    }
43
44    fn cache_dir(&self) -> Result<PathBuf> {
45        let prefix_data = env::var("PREFIX_DATA")?;
46        Ok(Path::new(&prefix_data)
47            .join("moat")
48            .join(format!("cache-{}", self.config.port)))
49    }
50
51    pub fn apply_command_args(cmd: &mut Command, config: &MoatConfig) -> Result<()> {
52        let minios = config
53            .provide_minio
54            .as_ref()
55            .ok_or_else(|| anyhow!("minio not provided"))?;
56
57        if minios.is_empty() || minios.len() > 1 {
58            return Err(anyhow!(
59                "expected exactly one minio instance, got {}",
60                minios.len()
61            ));
62        }
63
64        let minio = &minios[0];
65
66        cmd.arg("--listen").arg(format!("0.0.0.0:{}", config.port));
67        cmd.arg("--peer")
68            .arg(format!("{}:{}", config.address, config.port));
69        cmd.arg("--bootstrap-peers")
70            .arg(format!("{}:{}", config.address, config.port));
71        cmd.arg("--s3-endpoint")
72            .arg(format!("http://{}:{}", minio.listen_address, minio.port));
73        cmd.arg("--s3-access-key-id").arg(&minio.root_user);
74        cmd.arg("--s3-secret-access-key").arg(&minio.root_password);
75        cmd.arg("--s3-bucket").arg(&minio.hummock_bucket);
76        cmd.arg("--s3-region").arg("us-east-1");
77        cmd.arg("--weight").arg("1");
78        cmd.arg("--mem").arg("64MiB");
79        cmd.arg("--disk").arg("1GiB");
80
81        let prefix_log = env::var("PREFIX_LOG")?;
82        let log_dir = Path::new(&prefix_log)
83            .join("moat")
84            .join(format!("moat:{}", config.port));
85        cmd.arg("--telemetry-logging-dir").arg(log_dir);
86
87        cmd.env("RUST_LOG", "info");
88
89        Ok(())
90    }
91}
92
93impl Task for MoatService {
94    fn execute(&mut self, ctx: &mut ExecuteContext<impl std::io::Write>) -> anyhow::Result<()> {
95        ctx.service(self);
96        ctx.pb.set_message("starting...");
97
98        let path = self.moat_path()?;
99        if !path.exists() {
100            return Err(anyhow!(
101                "moat binary not found in {:?}\nDid you enable moat feature in `{}`?",
102                path,
103                stylized_risedev_subcmd("configure")
104            ));
105        }
106
107        let mut cmd = self.moat()?;
108        cmd.arg("--dir").arg(self.cache_dir()?);
109
110        Self::apply_command_args(&mut cmd, &self.config)?;
111        ctx.run_command(ctx.tmux_run(cmd)?)?;
112
113        ctx.pb.set_message("started");
114
115        Ok(())
116    }
117
118    fn id(&self) -> String {
119        self.config.id.clone()
120    }
121}