risedev/task/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::env;
16use std::path::Path;
17use std::process::Command;
18use std::sync::Once;
19
20use anyhow::{Result, anyhow};
21use itertools::Itertools;
22
23use super::ExecuteContext;
24use crate::util::is_env_set;
25use crate::{AwsS3Config, MetaNodeConfig, MinioConfig, OpendalConfig, TempoConfig};
26
27impl<W> ExecuteContext<W>
28where
29    W: std::io::Write,
30{
31    /// Get the command for starting the given component of RisingWave.
32    pub fn risingwave_cmd(&mut self, component: &str) -> Result<Command> {
33        let mut cmd = if let Ok(tag) = env::var("USE_SYSTEM_RISINGWAVE")
34            && let Some(tag) = tag.strip_prefix("docker:")
35        {
36            let image = format!("risingwavelabs/risingwave:{}", tag);
37
38            // Before returning the command, pull the image first.
39            self.pb
40                .set_message(format!("pulling docker image \"{image}\"..."));
41            static DOCKER_PULL: Once = Once::new();
42            DOCKER_PULL.call_once(|| {
43                let mut pull_cmd = Command::new("docker");
44                pull_cmd.arg("pull").arg(&image);
45                let output = pull_cmd.output().expect("Failed to pull docker image");
46                output.status.exit_ok().unwrap_or_else(|_| {
47                    panic!(
48                        "Failed to pull docker image: {}",
49                        String::from_utf8_lossy(&output.stderr)
50                    )
51                })
52            });
53
54            let wd = env::var("PREFIX")?; // passthrough the working directory
55            let name = format!("risedev-{}", self.id.as_ref().unwrap());
56
57            let mut cmd = Command::new("docker");
58            cmd.arg("run")
59                .arg("-it")
60                .arg("--rm")
61                .arg("--name")
62                .arg(&name)
63                .arg("--network")
64                .arg("host")
65                .arg("--cpus")
66                .arg("4") // release build has a paid license working with <= 4 cpus
67                .arg("-v")
68                .arg(format!("{wd}:{wd}"))
69                .arg(&image)
70                .arg(component);
71            cmd
72        } else if is_env_set("USE_SYSTEM_RISINGWAVE") {
73            let mut cmd = Command::new("risingwave");
74            cmd.arg(component);
75            cmd
76        } else {
77            let prefix_bin = std::env::var("PREFIX_BIN")?;
78            let path = Path::new(&prefix_bin).join("risingwave").join(component);
79            Command::new(path)
80        };
81
82        if crate::util::is_enable_backtrace() {
83            cmd.env("RUST_BACKTRACE", "1");
84        }
85
86        if crate::util::is_env_set("ENABLE_BUILD_RW_CONNECTOR") {
87            let prefix_bin = env::var("PREFIX_BIN")?;
88            cmd.env(
89                "CONNECTOR_LIBS_PATH",
90                Path::new(&prefix_bin).join("connector-node/libs/"),
91            );
92        }
93
94        let prefix_config = env::var("PREFIX_CONFIG")?;
95        cmd.arg("--config-path")
96            .arg(Path::new(&prefix_config).join("risingwave.toml"));
97
98        Ok(cmd)
99    }
100}
101
102/// Add a meta node to the parameters.
103pub fn add_meta_node(provide_meta_node: &[MetaNodeConfig], cmd: &mut Command) -> Result<()> {
104    match provide_meta_node {
105        [] => {
106            return Err(anyhow!(
107                "Cannot configure node: no meta node found in this configuration."
108            ));
109        }
110        meta_nodes => {
111            cmd.arg("--meta-address").arg(
112                meta_nodes
113                    .iter()
114                    .map(|meta_node| format!("http://{}:{}", meta_node.address, meta_node.port))
115                    .join(","),
116            );
117        }
118    };
119
120    Ok(())
121}
122
123/// Add the tempo endpoint to the environment variables.
124pub fn add_tempo_endpoint(provide_tempo: &[TempoConfig], cmd: &mut Command) -> Result<()> {
125    match provide_tempo {
126        [] => {}
127        [tempo] => {
128            cmd.env(
129                "RW_TRACING_ENDPOINT",
130                format!("http://{}:{}", tempo.address, tempo.otlp_port),
131            );
132        }
133        _ => {
134            return Err(anyhow!(
135                "{} Tempo instance found in config, but only 1 is needed",
136                provide_tempo.len()
137            ));
138        }
139    }
140
141    Ok(())
142}
143
144/// Strategy for whether to enable in-memory hummock if no minio and s3 is provided.
145pub enum HummockInMemoryStrategy {
146    /// Enable in-memory hummock. Used by single-node configuration.
147    Allowed,
148    /// Disallow in-memory hummock. Always requires minio or s3.
149    Disallowed,
150}
151
152/// Add a hummock storage backend to the parameters. Returns `(is_shared_backend, is_persistent_backend)`.
153pub fn add_hummock_backend(
154    id: &str,
155    provide_opendal: &[OpendalConfig],
156    provide_minio: &[MinioConfig],
157    provide_aws_s3: &[AwsS3Config],
158    hummock_in_memory_strategy: HummockInMemoryStrategy,
159    cmd: &mut Command,
160) -> Result<(bool, bool)> {
161    let (is_shared_backend, is_persistent_backend) = match (
162        provide_minio,
163        provide_aws_s3,
164        provide_opendal,
165    ) {
166        ([], [], []) => match hummock_in_memory_strategy {
167            HummockInMemoryStrategy::Allowed => {
168                cmd.arg("--state-store").arg("hummock+memory");
169                (false, false)
170            }
171            HummockInMemoryStrategy::Disallowed => {
172                return Err(anyhow!(
173                    "{} is not compatible with in-memory state backend. Need to enable either minio or aws-s3.",
174                    id
175                ));
176            }
177        },
178        ([minio], [], []) => {
179            cmd.arg("--state-store").arg(format!(
180                "hummock+minio://{hummock_user}:{hummock_password}@{minio_addr}:{minio_port}/{hummock_bucket}",
181                hummock_user = minio.root_user,
182                hummock_password = minio.root_password,
183                hummock_bucket = minio.hummock_bucket,
184                minio_addr = minio.address,
185                minio_port = minio.port,
186            ));
187            (true, true)
188        }
189        ([], [aws_s3], []) => {
190            cmd.arg("--state-store")
191                .arg(format!("hummock+s3://{}", aws_s3.bucket));
192            (true, true)
193        }
194        ([], [], [opendal]) => {
195            if opendal.engine == "hdfs" {
196                cmd.arg("--state-store")
197                    .arg(format!("hummock+hdfs://{}", opendal.namenode));
198            } else if opendal.engine == "gcs" {
199                cmd.arg("--state-store")
200                    .arg(format!("hummock+gcs://{}", opendal.bucket));
201            } else if opendal.engine == "obs" {
202                cmd.arg("--state-store")
203                    .arg(format!("hummock+obs://{}", opendal.bucket));
204            } else if opendal.engine == "oss" {
205                cmd.arg("--state-store")
206                    .arg(format!("hummock+oss://{}", opendal.bucket));
207            } else if opendal.engine == "webhdfs" {
208                cmd.arg("--state-store")
209                    .arg(format!("hummock+webhdfs://{}", opendal.namenode));
210            } else if opendal.engine == "azblob" {
211                cmd.arg("--state-store")
212                    .arg(format!("hummock+azblob://{}", opendal.bucket));
213            } else if opendal.engine == "fs" {
214                println!("using fs engine xxxx");
215                cmd.arg("--state-store")
216                    .arg(format!("hummock+fs://{}", opendal.bucket));
217            } else {
218                unimplemented!()
219            }
220            (true, true)
221        }
222
223        (other_minio, other_s3, _) => {
224            return Err(anyhow!(
225                "{} minio and {} s3 instance found in config, but only 1 is needed",
226                other_minio.len(),
227                other_s3.len()
228            ));
229        }
230    };
231
232    Ok((is_shared_backend, is_persistent_backend))
233}