risedev/task/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::env;
16use std::path::Path;
17use std::process::Command;
18use std::sync::Once;
19
20use anyhow::{Result, anyhow};
21use itertools::Itertools;
22
23use super::ExecuteContext;
24use crate::util::is_env_set;
25use crate::{AwsS3Config, MetaNodeConfig, MinioConfig, MoatConfig, OpendalConfig, TempoConfig};
26
27impl<W> ExecuteContext<W>
28where
29    W: std::io::Write,
30{
31    /// Get the command for starting the given component of RisingWave.
32    pub fn risingwave_cmd(&mut self, component: &str) -> Result<Command> {
33        let mut cmd = if let Ok(tag) = env::var("USE_SYSTEM_RISINGWAVE")
34            && let Some(tag) = tag.strip_prefix("docker:")
35        {
36            let image = format!("risingwavelabs/risingwave:{}", tag);
37
38            // Before returning the command, pull the image first.
39            self.pb
40                .set_message(format!("pulling docker image \"{image}\"..."));
41            static DOCKER_PULL: Once = Once::new();
42            DOCKER_PULL.call_once(|| {
43                let mut pull_cmd = Command::new("docker");
44                pull_cmd.arg("pull").arg(&image);
45                let output = pull_cmd.output().expect("Failed to pull docker image");
46                output.status.exit_ok().unwrap_or_else(|_| {
47                    panic!(
48                        "Failed to pull docker image: {}",
49                        String::from_utf8_lossy(&output.stderr)
50                    )
51                })
52            });
53
54            let wd = env::var("PREFIX")?; // passthrough the working directory
55            let name = format!("risedev-{}", self.id.as_ref().unwrap());
56
57            let mut cmd = Command::new("docker");
58            cmd.arg("run")
59                .arg("-it")
60                .arg("--rm")
61                .arg("--name")
62                .arg(&name)
63                .arg("--network")
64                .arg("host")
65                // Release build has a premium license working with <= 4 RWUs, i.e.,
66                // 4 CPUs and 16 GiB of memory.
67                .arg("--cpus")
68                .arg("4")
69                .arg("--memory")
70                .arg("16g")
71                .arg("-v")
72                .arg(format!("{wd}:{wd}"))
73                .arg(&image)
74                .arg(component);
75            cmd
76        } else if is_env_set("USE_SYSTEM_RISINGWAVE") {
77            let mut cmd = Command::new("risingwave");
78            cmd.arg(component);
79            cmd
80        } else {
81            let prefix_bin = std::env::var("PREFIX_BIN")?;
82            let path = Path::new(&prefix_bin).join("risingwave").join(component);
83            Command::new(path)
84        };
85
86        if crate::util::is_enable_backtrace() {
87            cmd.env("RUST_BACKTRACE", "1");
88        }
89
90        if crate::util::is_env_set("ENABLE_BUILD_RW_CONNECTOR") {
91            let prefix_bin = env::var("PREFIX_BIN")?;
92            cmd.env(
93                "CONNECTOR_LIBS_PATH",
94                Path::new(&prefix_bin).join("connector-node/libs/"),
95            );
96        }
97
98        let prefix_config = env::var("PREFIX_CONFIG")?;
99        cmd.arg("--config-path")
100            .arg(Path::new(&prefix_config).join("risingwave.toml"));
101
102        Ok(cmd)
103    }
104}
105
106/// Add a meta node to the parameters.
107pub fn add_meta_node(provide_meta_node: &[MetaNodeConfig], cmd: &mut Command) -> Result<()> {
108    match provide_meta_node {
109        [] => {
110            return Err(anyhow!(
111                "Cannot configure node: no meta node found in this configuration."
112            ));
113        }
114        meta_nodes => {
115            cmd.arg("--meta-address").arg(
116                meta_nodes
117                    .iter()
118                    .map(|meta_node| format!("http://{}:{}", meta_node.address, meta_node.port))
119                    .join(","),
120            );
121        }
122    };
123
124    Ok(())
125}
126
127/// Add the tempo endpoint to the environment variables.
128pub fn add_tempo_endpoint(provide_tempo: &[TempoConfig], cmd: &mut Command) -> Result<()> {
129    match provide_tempo {
130        [] => {}
131        [tempo] => {
132            cmd.env(
133                "RW_TRACING_ENDPOINT",
134                format!("http://{}:{}", tempo.address, tempo.otlp_port),
135            );
136        }
137        _ => {
138            return Err(anyhow!(
139                "{} Tempo instance found in config, but only 1 is needed",
140                provide_tempo.len()
141            ));
142        }
143    }
144
145    Ok(())
146}
147
148/// Strategy for whether to enable in-memory hummock if no minio and s3 is provided.
149pub enum HummockInMemoryStrategy {
150    /// Enable in-memory hummock. Used by single-node configuration.
151    Allowed,
152    /// Disallow in-memory hummock. Always requires minio or s3.
153    Disallowed,
154}
155
156/// Add a hummock storage backend to the parameters. Returns `(is_shared_backend, is_persistent_backend)`.
157pub fn add_hummock_backend(
158    id: &str,
159    provide_opendal: &[OpendalConfig],
160    provide_minio: &[MinioConfig],
161    provide_aws_s3: &[AwsS3Config],
162    provide_moat: &[MoatConfig],
163    hummock_in_memory_strategy: HummockInMemoryStrategy,
164    cmd: &mut Command,
165) -> Result<(bool, bool)> {
166    let (is_shared_backend, is_persistent_backend) = match (
167        provide_minio,
168        provide_aws_s3,
169        provide_opendal,
170        provide_moat,
171    ) {
172        ([], [], [], []) => match hummock_in_memory_strategy {
173            HummockInMemoryStrategy::Allowed => {
174                cmd.arg("--state-store").arg("hummock+memory");
175                (false, false)
176            }
177            HummockInMemoryStrategy::Disallowed => {
178                return Err(anyhow!(
179                    "{} is not compatible with in-memory state backend. Need to enable either minio or aws-s3.",
180                    id
181                ));
182            }
183        },
184        ([minio], [], [], []) => {
185            cmd.arg("--state-store").arg(format!(
186                "hummock+minio://{hummock_user}:{hummock_password}@{minio_addr}:{minio_port}/{hummock_bucket}",
187                hummock_user = minio.root_user,
188                hummock_password = minio.root_password,
189                hummock_bucket = minio.hummock_bucket,
190                minio_addr = minio.address,
191                minio_port = minio.port,
192            ));
193            (true, true)
194        }
195        ([], [aws_s3], [], []) => {
196            cmd.arg("--state-store")
197                .arg(format!("hummock+s3://{}", aws_s3.bucket));
198            (true, true)
199        }
200        ([], [], [opendal], []) => {
201            if opendal.engine == "hdfs" {
202                cmd.arg("--state-store")
203                    .arg(format!("hummock+hdfs://{}", opendal.namenode));
204            } else if opendal.engine == "gcs" {
205                cmd.arg("--state-store")
206                    .arg(format!("hummock+gcs://{}", opendal.bucket));
207            } else if opendal.engine == "obs" {
208                cmd.arg("--state-store")
209                    .arg(format!("hummock+obs://{}", opendal.bucket));
210            } else if opendal.engine == "oss" {
211                cmd.arg("--state-store")
212                    .arg(format!("hummock+oss://{}", opendal.bucket));
213            } else if opendal.engine == "webhdfs" {
214                cmd.arg("--state-store")
215                    .arg(format!("hummock+webhdfs://{}", opendal.namenode));
216            } else if opendal.engine == "azblob" {
217                cmd.arg("--state-store")
218                    .arg(format!("hummock+azblob://{}", opendal.bucket));
219            } else if opendal.engine == "fs" {
220                println!("using fs engine xxxx");
221                cmd.arg("--state-store")
222                    .arg(format!("hummock+fs://{}", opendal.bucket));
223            } else {
224                unimplemented!()
225            }
226            (true, true)
227        }
228        ([minio], _, _, [moat]) => {
229            cmd.arg("--state-store").arg(format!(
230                "hummock+minio://{hummock_user}:{hummock_password}@{moat_addr}:{moat_port}/{hummock_bucket}",
231                hummock_user = minio.root_user,
232                hummock_password = minio.root_password,
233                hummock_bucket = minio.hummock_bucket,
234                moat_addr = moat.address,
235                moat_port = moat.port,
236            ));
237            (true, true)
238        }
239        (other_minio, other_s3, _, _) => {
240            return Err(anyhow!(
241                "{} minio and {} s3 instance found in config, but only 1 is needed",
242                other_minio.len(),
243                other_s3.len()
244            ));
245        }
246    };
247
248    Ok((is_shared_backend, is_persistent_backend))
249}