1use std::env;
16use std::path::Path;
17use std::process::Command;
18
19use anyhow::{Result, anyhow};
20use itertools::Itertools;
21
22use crate::util::is_env_set;
23use crate::{AwsS3Config, MetaNodeConfig, MinioConfig, OpendalConfig, TempoConfig};
24
25pub fn risingwave_cmd(component: &str) -> Result<Command> {
27 let mut cmd = if is_env_set("USE_SYSTEM_RISINGWAVE") {
28 let mut cmd = Command::new("risingwave");
29 cmd.arg(component);
30 cmd
31 } else {
32 let prefix_bin = std::env::var("PREFIX_BIN")?;
33 let path = Path::new(&prefix_bin).join("risingwave").join(component);
34 Command::new(path)
35 };
36
37 if crate::util::is_enable_backtrace() {
38 cmd.env("RUST_BACKTRACE", "1");
39 }
40
41 if crate::util::is_env_set("ENABLE_BUILD_RW_CONNECTOR") {
42 let prefix_bin = env::var("PREFIX_BIN")?;
43 cmd.env(
44 "CONNECTOR_LIBS_PATH",
45 Path::new(&prefix_bin).join("connector-node/libs/"),
46 );
47 }
48
49 let prefix_config = env::var("PREFIX_CONFIG")?;
50 cmd.arg("--config-path")
51 .arg(Path::new(&prefix_config).join("risingwave.toml"));
52
53 Ok(cmd)
54}
55
56pub fn add_meta_node(provide_meta_node: &[MetaNodeConfig], cmd: &mut Command) -> Result<()> {
58 match provide_meta_node {
59 [] => {
60 return Err(anyhow!(
61 "Cannot configure node: no meta node found in this configuration."
62 ));
63 }
64 meta_nodes => {
65 cmd.arg("--meta-address").arg(
66 meta_nodes
67 .iter()
68 .map(|meta_node| format!("http://{}:{}", meta_node.address, meta_node.port))
69 .join(","),
70 );
71 }
72 };
73
74 Ok(())
75}
76
77pub fn add_tempo_endpoint(provide_tempo: &[TempoConfig], cmd: &mut Command) -> Result<()> {
79 match provide_tempo {
80 [] => {}
81 [tempo] => {
82 cmd.env(
83 "RW_TRACING_ENDPOINT",
84 format!("http://{}:{}", tempo.address, tempo.otlp_port),
85 );
86 }
87 _ => {
88 return Err(anyhow!(
89 "{} Tempo instance found in config, but only 1 is needed",
90 provide_tempo.len()
91 ));
92 }
93 }
94
95 Ok(())
96}
97
98pub enum HummockInMemoryStrategy {
100 Isolated,
102 Shared,
105 Disallowed,
107}
108
109pub fn add_hummock_backend(
111 id: &str,
112 provide_opendal: &[OpendalConfig],
113 provide_minio: &[MinioConfig],
114 provide_aws_s3: &[AwsS3Config],
115 hummock_in_memory_strategy: HummockInMemoryStrategy,
116 cmd: &mut Command,
117) -> Result<(bool, bool)> {
118 let (is_shared_backend, is_persistent_backend) = match (
119 provide_minio,
120 provide_aws_s3,
121 provide_opendal,
122 ) {
123 ([], [], []) => match hummock_in_memory_strategy {
124 HummockInMemoryStrategy::Isolated => {
125 cmd.arg("--state-store").arg("hummock+memory");
126 (false, false)
127 }
128 HummockInMemoryStrategy::Shared => {
129 cmd.arg("--state-store").arg("hummock+memory-shared");
130 (true, false)
131 }
132 HummockInMemoryStrategy::Disallowed => {
133 return Err(anyhow!(
134 "{} is not compatible with in-memory state backend. Need to enable either minio or aws-s3.",
135 id
136 ));
137 }
138 },
139 ([minio], [], []) => {
140 cmd.arg("--state-store").arg(format!(
141 "hummock+minio://{hummock_user}:{hummock_password}@{minio_addr}:{minio_port}/{hummock_bucket}",
142 hummock_user = minio.root_user,
143 hummock_password = minio.root_password,
144 hummock_bucket = minio.hummock_bucket,
145 minio_addr = minio.address,
146 minio_port = minio.port,
147 ));
148 (true, true)
149 }
150 ([], [aws_s3], []) => {
151 cmd.arg("--state-store")
152 .arg(format!("hummock+s3://{}", aws_s3.bucket));
153 (true, true)
154 }
155 ([], [], [opendal]) => {
156 if opendal.engine == "hdfs" {
157 cmd.arg("--state-store")
158 .arg(format!("hummock+hdfs://{}", opendal.namenode));
159 } else if opendal.engine == "gcs" {
160 cmd.arg("--state-store")
161 .arg(format!("hummock+gcs://{}", opendal.bucket));
162 } else if opendal.engine == "obs" {
163 cmd.arg("--state-store")
164 .arg(format!("hummock+obs://{}", opendal.bucket));
165 } else if opendal.engine == "oss" {
166 cmd.arg("--state-store")
167 .arg(format!("hummock+oss://{}", opendal.bucket));
168 } else if opendal.engine == "webhdfs" {
169 cmd.arg("--state-store")
170 .arg(format!("hummock+webhdfs://{}", opendal.namenode));
171 } else if opendal.engine == "azblob" {
172 cmd.arg("--state-store")
173 .arg(format!("hummock+azblob://{}", opendal.bucket));
174 } else if opendal.engine == "fs" {
175 println!("using fs engine xxxx");
176 cmd.arg("--state-store")
177 .arg(format!("hummock+fs://{}", opendal.bucket));
178 } else {
179 unimplemented!()
180 }
181 (true, true)
182 }
183
184 (other_minio, other_s3, _) => {
185 return Err(anyhow!(
186 "{} minio and {} s3 instance found in config, but only 1 is needed",
187 other_minio.len(),
188 other_s3.len()
189 ));
190 }
191 };
192
193 Ok((is_shared_backend, is_persistent_backend))
194}