risedev/task/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::env;
16use std::path::Path;
17use std::process::Command;
18use std::sync::Once;
19
20use anyhow::{Result, anyhow};
21use itertools::Itertools;
22
23use super::ExecuteContext;
24use crate::util::is_env_set;
25use crate::{AwsS3Config, MetaNodeConfig, MinioConfig, MoatConfig, OpendalConfig, TempoConfig};
26
27impl<W> ExecuteContext<W>
28where
29    W: std::io::Write,
30{
31    /// Get the command for starting the given component of RisingWave.
32    pub fn risingwave_cmd(&mut self, component: &str) -> Result<Command> {
33        let mut cmd = if let Ok(tag) = env::var("USE_SYSTEM_RISINGWAVE")
34            && let Some(tag) = tag.strip_prefix("docker:")
35        {
36            let image = format!("risingwavelabs/risingwave:{}", tag);
37
38            // Before returning the command, pull the image first.
39            self.pb
40                .set_message(format!("pulling docker image \"{image}\"..."));
41            static DOCKER_PULL: Once = Once::new();
42            DOCKER_PULL.call_once(|| {
43                let mut pull_cmd = Command::new("docker");
44                pull_cmd.arg("pull").arg(&image);
45                let output = pull_cmd.output().expect("Failed to pull docker image");
46                output.status.exit_ok().unwrap_or_else(|_| {
47                    panic!(
48                        "Failed to pull docker image: {}",
49                        String::from_utf8_lossy(&output.stderr)
50                    )
51                })
52            });
53
54            let wd = env::var("PREFIX")?; // passthrough the working directory
55            let name = format!("risedev-{}", self.id.as_ref().unwrap());
56
57            let mut cmd = Command::new("docker");
58            cmd.arg("run")
59                .arg("-it")
60                .arg("--rm")
61                .arg("--name")
62                .arg(&name)
63                .arg("--network")
64                .arg("host")
65                .arg("--cpus")
66                .arg("4") // release build has a premium license working with <= 4 cpus
67                .arg("-v")
68                .arg(format!("{wd}:{wd}"))
69                .arg(&image)
70                .arg(component);
71            cmd
72        } else if is_env_set("USE_SYSTEM_RISINGWAVE") {
73            let mut cmd = Command::new("risingwave");
74            cmd.arg(component);
75            cmd
76        } else {
77            let prefix_bin = std::env::var("PREFIX_BIN")?;
78            let path = Path::new(&prefix_bin).join("risingwave").join(component);
79            Command::new(path)
80        };
81
82        if crate::util::is_enable_backtrace() {
83            cmd.env("RUST_BACKTRACE", "1");
84        }
85
86        if crate::util::is_env_set("ENABLE_BUILD_RW_CONNECTOR") {
87            let prefix_bin = env::var("PREFIX_BIN")?;
88            cmd.env(
89                "CONNECTOR_LIBS_PATH",
90                Path::new(&prefix_bin).join("connector-node/libs/"),
91            );
92        }
93
94        let prefix_config = env::var("PREFIX_CONFIG")?;
95        cmd.arg("--config-path")
96            .arg(Path::new(&prefix_config).join("risingwave.toml"));
97
98        Ok(cmd)
99    }
100}
101
102/// Add a meta node to the parameters.
103pub fn add_meta_node(provide_meta_node: &[MetaNodeConfig], cmd: &mut Command) -> Result<()> {
104    match provide_meta_node {
105        [] => {
106            return Err(anyhow!(
107                "Cannot configure node: no meta node found in this configuration."
108            ));
109        }
110        meta_nodes => {
111            cmd.arg("--meta-address").arg(
112                meta_nodes
113                    .iter()
114                    .map(|meta_node| format!("http://{}:{}", meta_node.address, meta_node.port))
115                    .join(","),
116            );
117        }
118    };
119
120    Ok(())
121}
122
123/// Add the tempo endpoint to the environment variables.
124pub fn add_tempo_endpoint(provide_tempo: &[TempoConfig], cmd: &mut Command) -> Result<()> {
125    match provide_tempo {
126        [] => {}
127        [tempo] => {
128            cmd.env(
129                "RW_TRACING_ENDPOINT",
130                format!("http://{}:{}", tempo.address, tempo.otlp_port),
131            );
132        }
133        _ => {
134            return Err(anyhow!(
135                "{} Tempo instance found in config, but only 1 is needed",
136                provide_tempo.len()
137            ));
138        }
139    }
140
141    Ok(())
142}
143
144/// Strategy for whether to enable in-memory hummock if no minio and s3 is provided.
145pub enum HummockInMemoryStrategy {
146    /// Enable in-memory hummock. Used by single-node configuration.
147    Allowed,
148    /// Disallow in-memory hummock. Always requires minio or s3.
149    Disallowed,
150}
151
152/// Add a hummock storage backend to the parameters. Returns `(is_shared_backend, is_persistent_backend)`.
153pub fn add_hummock_backend(
154    id: &str,
155    provide_opendal: &[OpendalConfig],
156    provide_minio: &[MinioConfig],
157    provide_aws_s3: &[AwsS3Config],
158    provide_moat: &[MoatConfig],
159    hummock_in_memory_strategy: HummockInMemoryStrategy,
160    cmd: &mut Command,
161) -> Result<(bool, bool)> {
162    let (is_shared_backend, is_persistent_backend) = match (
163        provide_minio,
164        provide_aws_s3,
165        provide_opendal,
166        provide_moat,
167    ) {
168        ([], [], [], []) => match hummock_in_memory_strategy {
169            HummockInMemoryStrategy::Allowed => {
170                cmd.arg("--state-store").arg("hummock+memory");
171                (false, false)
172            }
173            HummockInMemoryStrategy::Disallowed => {
174                return Err(anyhow!(
175                    "{} is not compatible with in-memory state backend. Need to enable either minio or aws-s3.",
176                    id
177                ));
178            }
179        },
180        ([minio], [], [], []) => {
181            cmd.arg("--state-store").arg(format!(
182                "hummock+minio://{hummock_user}:{hummock_password}@{minio_addr}:{minio_port}/{hummock_bucket}",
183                hummock_user = minio.root_user,
184                hummock_password = minio.root_password,
185                hummock_bucket = minio.hummock_bucket,
186                minio_addr = minio.address,
187                minio_port = minio.port,
188            ));
189            (true, true)
190        }
191        ([], [aws_s3], [], []) => {
192            cmd.arg("--state-store")
193                .arg(format!("hummock+s3://{}", aws_s3.bucket));
194            (true, true)
195        }
196        ([], [], [opendal], []) => {
197            if opendal.engine == "hdfs" {
198                cmd.arg("--state-store")
199                    .arg(format!("hummock+hdfs://{}", opendal.namenode));
200            } else if opendal.engine == "gcs" {
201                cmd.arg("--state-store")
202                    .arg(format!("hummock+gcs://{}", opendal.bucket));
203            } else if opendal.engine == "obs" {
204                cmd.arg("--state-store")
205                    .arg(format!("hummock+obs://{}", opendal.bucket));
206            } else if opendal.engine == "oss" {
207                cmd.arg("--state-store")
208                    .arg(format!("hummock+oss://{}", opendal.bucket));
209            } else if opendal.engine == "webhdfs" {
210                cmd.arg("--state-store")
211                    .arg(format!("hummock+webhdfs://{}", opendal.namenode));
212            } else if opendal.engine == "azblob" {
213                cmd.arg("--state-store")
214                    .arg(format!("hummock+azblob://{}", opendal.bucket));
215            } else if opendal.engine == "fs" {
216                println!("using fs engine xxxx");
217                cmd.arg("--state-store")
218                    .arg(format!("hummock+fs://{}", opendal.bucket));
219            } else {
220                unimplemented!()
221            }
222            (true, true)
223        }
224        ([minio], _, _, [moat]) => {
225            cmd.arg("--state-store").arg(format!(
226                "hummock+minio://{hummock_user}:{hummock_password}@{moat_addr}:{moat_port}/{hummock_bucket}",
227                hummock_user = minio.root_user,
228                hummock_password = minio.root_password,
229                hummock_bucket = minio.hummock_bucket,
230                moat_addr = moat.address,
231                moat_port = moat.port,
232            ));
233            (true, true)
234        }
235        (other_minio, other_s3, _, _) => {
236            return Err(anyhow!(
237                "{} minio and {} s3 instance found in config, but only 1 is needed",
238                other_minio.len(),
239                other_s3.len()
240            ));
241        }
242    };
243
244    Ok((is_shared_backend, is_persistent_backend))
245}