risedev/task/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::env;
16use std::path::Path;
17use std::process::Command;
18
19use anyhow::{Result, anyhow};
20use itertools::Itertools;
21
22use crate::util::is_env_set;
23use crate::{AwsS3Config, MetaNodeConfig, MinioConfig, OpendalConfig, TempoConfig};
24
25/// Get the command for starting the given component of RisingWave.
26pub fn risingwave_cmd(component: &str) -> Result<Command> {
27    let mut cmd = if is_env_set("USE_SYSTEM_RISINGWAVE") {
28        let mut cmd = Command::new("risingwave");
29        cmd.arg(component);
30        cmd
31    } else {
32        let prefix_bin = std::env::var("PREFIX_BIN")?;
33        let path = Path::new(&prefix_bin).join("risingwave").join(component);
34        Command::new(path)
35    };
36
37    if crate::util::is_enable_backtrace() {
38        cmd.env("RUST_BACKTRACE", "1");
39    }
40
41    if crate::util::is_env_set("ENABLE_BUILD_RW_CONNECTOR") {
42        let prefix_bin = env::var("PREFIX_BIN")?;
43        cmd.env(
44            "CONNECTOR_LIBS_PATH",
45            Path::new(&prefix_bin).join("connector-node/libs/"),
46        );
47    }
48
49    let prefix_config = env::var("PREFIX_CONFIG")?;
50    cmd.arg("--config-path")
51        .arg(Path::new(&prefix_config).join("risingwave.toml"));
52
53    Ok(cmd)
54}
55
56/// Add a meta node to the parameters.
57pub fn add_meta_node(provide_meta_node: &[MetaNodeConfig], cmd: &mut Command) -> Result<()> {
58    match provide_meta_node {
59        [] => {
60            return Err(anyhow!(
61                "Cannot configure node: no meta node found in this configuration."
62            ));
63        }
64        meta_nodes => {
65            cmd.arg("--meta-address").arg(
66                meta_nodes
67                    .iter()
68                    .map(|meta_node| format!("http://{}:{}", meta_node.address, meta_node.port))
69                    .join(","),
70            );
71        }
72    };
73
74    Ok(())
75}
76
77/// Add the tempo endpoint to the environment variables.
78pub fn add_tempo_endpoint(provide_tempo: &[TempoConfig], cmd: &mut Command) -> Result<()> {
79    match provide_tempo {
80        [] => {}
81        [tempo] => {
82            cmd.env(
83                "RW_TRACING_ENDPOINT",
84                format!("http://{}:{}", tempo.address, tempo.otlp_port),
85            );
86        }
87        _ => {
88            return Err(anyhow!(
89                "{} Tempo instance found in config, but only 1 is needed",
90                provide_tempo.len()
91            ));
92        }
93    }
94
95    Ok(())
96}
97
98/// Strategy for whether to enable in-memory hummock if no minio and s3 is provided.
99pub enum HummockInMemoryStrategy {
100    /// Enable isolated in-memory hummock. Used by single-node configuration.
101    Isolated,
102    /// Enable in-memory hummock shared in a single process. Used by risedev playground and
103    /// deterministic end-to-end tests.
104    Shared,
105    /// Disallow in-memory hummock. Always requires minio or s3.
106    Disallowed,
107}
108
109/// Add a hummock storage backend to the parameters. Returns whether this is a shared backend.
110pub fn add_hummock_backend(
111    id: &str,
112    provide_opendal: &[OpendalConfig],
113    provide_minio: &[MinioConfig],
114    provide_aws_s3: &[AwsS3Config],
115    hummock_in_memory_strategy: HummockInMemoryStrategy,
116    cmd: &mut Command,
117) -> Result<(bool, bool)> {
118    let (is_shared_backend, is_persistent_backend) = match (
119        provide_minio,
120        provide_aws_s3,
121        provide_opendal,
122    ) {
123        ([], [], []) => match hummock_in_memory_strategy {
124            HummockInMemoryStrategy::Isolated => {
125                cmd.arg("--state-store").arg("hummock+memory");
126                (false, false)
127            }
128            HummockInMemoryStrategy::Shared => {
129                cmd.arg("--state-store").arg("hummock+memory-shared");
130                (true, false)
131            }
132            HummockInMemoryStrategy::Disallowed => {
133                return Err(anyhow!(
134                    "{} is not compatible with in-memory state backend. Need to enable either minio or aws-s3.",
135                    id
136                ));
137            }
138        },
139        ([minio], [], []) => {
140            cmd.arg("--state-store").arg(format!(
141                "hummock+minio://{hummock_user}:{hummock_password}@{minio_addr}:{minio_port}/{hummock_bucket}",
142                hummock_user = minio.root_user,
143                hummock_password = minio.root_password,
144                hummock_bucket = minio.hummock_bucket,
145                minio_addr = minio.address,
146                minio_port = minio.port,
147            ));
148            (true, true)
149        }
150        ([], [aws_s3], []) => {
151            cmd.arg("--state-store")
152                .arg(format!("hummock+s3://{}", aws_s3.bucket));
153            (true, true)
154        }
155        ([], [], [opendal]) => {
156            if opendal.engine == "hdfs" {
157                cmd.arg("--state-store")
158                    .arg(format!("hummock+hdfs://{}", opendal.namenode));
159            } else if opendal.engine == "gcs" {
160                cmd.arg("--state-store")
161                    .arg(format!("hummock+gcs://{}", opendal.bucket));
162            } else if opendal.engine == "obs" {
163                cmd.arg("--state-store")
164                    .arg(format!("hummock+obs://{}", opendal.bucket));
165            } else if opendal.engine == "oss" {
166                cmd.arg("--state-store")
167                    .arg(format!("hummock+oss://{}", opendal.bucket));
168            } else if opendal.engine == "webhdfs" {
169                cmd.arg("--state-store")
170                    .arg(format!("hummock+webhdfs://{}", opendal.namenode));
171            } else if opendal.engine == "azblob" {
172                cmd.arg("--state-store")
173                    .arg(format!("hummock+azblob://{}", opendal.bucket));
174            } else if opendal.engine == "fs" {
175                println!("using fs engine xxxx");
176                cmd.arg("--state-store")
177                    .arg(format!("hummock+fs://{}", opendal.bucket));
178            } else {
179                unimplemented!()
180            }
181            (true, true)
182        }
183
184        (other_minio, other_s3, _) => {
185            return Err(anyhow!(
186                "{} minio and {} s3 instance found in config, but only 1 is needed",
187                other_minio.len(),
188                other_s3.len()
189            ));
190        }
191    };
192
193    Ok((is_shared_backend, is_persistent_backend))
194}