1use std::env;
16use std::path::Path;
17use std::process::Command;
18use std::sync::Once;
19
20use anyhow::{Result, anyhow};
21use itertools::Itertools;
22
23use super::ExecuteContext;
24use crate::util::is_env_set;
25use crate::{AwsS3Config, MetaNodeConfig, MinioConfig, MoatConfig, OpendalConfig, TempoConfig};
26
27impl<W> ExecuteContext<W>
28where
29 W: std::io::Write,
30{
31 pub fn risingwave_cmd(&mut self, component: &str) -> Result<Command> {
33 let mut cmd = if let Ok(tag) = env::var("USE_SYSTEM_RISINGWAVE")
34 && let Some(tag) = tag.strip_prefix("docker:")
35 {
36 let image = format!("risingwavelabs/risingwave:{}", tag);
37
38 self.pb
40 .set_message(format!("pulling docker image \"{image}\"..."));
41 static DOCKER_PULL: Once = Once::new();
42 DOCKER_PULL.call_once(|| {
43 let mut pull_cmd = Command::new("docker");
44 pull_cmd.arg("pull").arg(&image);
45 let output = pull_cmd.output().expect("Failed to pull docker image");
46 output.status.exit_ok().unwrap_or_else(|_| {
47 panic!(
48 "Failed to pull docker image: {}",
49 String::from_utf8_lossy(&output.stderr)
50 )
51 })
52 });
53
54 let wd = env::var("PREFIX")?; let name = format!("risedev-{}", self.id.as_ref().unwrap());
56
57 let mut cmd = Command::new("docker");
58 cmd.arg("run")
59 .arg("-it")
60 .arg("--rm")
61 .arg("--name")
62 .arg(&name)
63 .arg("--network")
64 .arg("host")
65 .arg("--cpus")
66 .arg("4") .arg("-v")
68 .arg(format!("{wd}:{wd}"))
69 .arg(&image)
70 .arg(component);
71 cmd
72 } else if is_env_set("USE_SYSTEM_RISINGWAVE") {
73 let mut cmd = Command::new("risingwave");
74 cmd.arg(component);
75 cmd
76 } else {
77 let prefix_bin = std::env::var("PREFIX_BIN")?;
78 let path = Path::new(&prefix_bin).join("risingwave").join(component);
79 Command::new(path)
80 };
81
82 if crate::util::is_enable_backtrace() {
83 cmd.env("RUST_BACKTRACE", "1");
84 }
85
86 if crate::util::is_env_set("ENABLE_BUILD_RW_CONNECTOR") {
87 let prefix_bin = env::var("PREFIX_BIN")?;
88 cmd.env(
89 "CONNECTOR_LIBS_PATH",
90 Path::new(&prefix_bin).join("connector-node/libs/"),
91 );
92 }
93
94 let prefix_config = env::var("PREFIX_CONFIG")?;
95 cmd.arg("--config-path")
96 .arg(Path::new(&prefix_config).join("risingwave.toml"));
97
98 Ok(cmd)
99 }
100}
101
102pub fn add_meta_node(provide_meta_node: &[MetaNodeConfig], cmd: &mut Command) -> Result<()> {
104 match provide_meta_node {
105 [] => {
106 return Err(anyhow!(
107 "Cannot configure node: no meta node found in this configuration."
108 ));
109 }
110 meta_nodes => {
111 cmd.arg("--meta-address").arg(
112 meta_nodes
113 .iter()
114 .map(|meta_node| format!("http://{}:{}", meta_node.address, meta_node.port))
115 .join(","),
116 );
117 }
118 };
119
120 Ok(())
121}
122
123pub fn add_tempo_endpoint(provide_tempo: &[TempoConfig], cmd: &mut Command) -> Result<()> {
125 match provide_tempo {
126 [] => {}
127 [tempo] => {
128 cmd.env(
129 "RW_TRACING_ENDPOINT",
130 format!("http://{}:{}", tempo.address, tempo.otlp_port),
131 );
132 }
133 _ => {
134 return Err(anyhow!(
135 "{} Tempo instance found in config, but only 1 is needed",
136 provide_tempo.len()
137 ));
138 }
139 }
140
141 Ok(())
142}
143
144pub enum HummockInMemoryStrategy {
146 Allowed,
148 Disallowed,
150}
151
152pub fn add_hummock_backend(
154 id: &str,
155 provide_opendal: &[OpendalConfig],
156 provide_minio: &[MinioConfig],
157 provide_aws_s3: &[AwsS3Config],
158 provide_moat: &[MoatConfig],
159 hummock_in_memory_strategy: HummockInMemoryStrategy,
160 cmd: &mut Command,
161) -> Result<(bool, bool)> {
162 let (is_shared_backend, is_persistent_backend) = match (
163 provide_minio,
164 provide_aws_s3,
165 provide_opendal,
166 provide_moat,
167 ) {
168 ([], [], [], []) => match hummock_in_memory_strategy {
169 HummockInMemoryStrategy::Allowed => {
170 cmd.arg("--state-store").arg("hummock+memory");
171 (false, false)
172 }
173 HummockInMemoryStrategy::Disallowed => {
174 return Err(anyhow!(
175 "{} is not compatible with in-memory state backend. Need to enable either minio or aws-s3.",
176 id
177 ));
178 }
179 },
180 ([minio], [], [], []) => {
181 cmd.arg("--state-store").arg(format!(
182 "hummock+minio://{hummock_user}:{hummock_password}@{minio_addr}:{minio_port}/{hummock_bucket}",
183 hummock_user = minio.root_user,
184 hummock_password = minio.root_password,
185 hummock_bucket = minio.hummock_bucket,
186 minio_addr = minio.address,
187 minio_port = minio.port,
188 ));
189 (true, true)
190 }
191 ([], [aws_s3], [], []) => {
192 cmd.arg("--state-store")
193 .arg(format!("hummock+s3://{}", aws_s3.bucket));
194 (true, true)
195 }
196 ([], [], [opendal], []) => {
197 if opendal.engine == "hdfs" {
198 cmd.arg("--state-store")
199 .arg(format!("hummock+hdfs://{}", opendal.namenode));
200 } else if opendal.engine == "gcs" {
201 cmd.arg("--state-store")
202 .arg(format!("hummock+gcs://{}", opendal.bucket));
203 } else if opendal.engine == "obs" {
204 cmd.arg("--state-store")
205 .arg(format!("hummock+obs://{}", opendal.bucket));
206 } else if opendal.engine == "oss" {
207 cmd.arg("--state-store")
208 .arg(format!("hummock+oss://{}", opendal.bucket));
209 } else if opendal.engine == "webhdfs" {
210 cmd.arg("--state-store")
211 .arg(format!("hummock+webhdfs://{}", opendal.namenode));
212 } else if opendal.engine == "azblob" {
213 cmd.arg("--state-store")
214 .arg(format!("hummock+azblob://{}", opendal.bucket));
215 } else if opendal.engine == "fs" {
216 println!("using fs engine xxxx");
217 cmd.arg("--state-store")
218 .arg(format!("hummock+fs://{}", opendal.bucket));
219 } else {
220 unimplemented!()
221 }
222 (true, true)
223 }
224 ([minio], _, _, [moat]) => {
225 cmd.arg("--state-store").arg(format!(
226 "hummock+minio://{hummock_user}:{hummock_password}@{moat_addr}:{moat_port}/{hummock_bucket}",
227 hummock_user = minio.root_user,
228 hummock_password = minio.root_password,
229 hummock_bucket = minio.hummock_bucket,
230 moat_addr = moat.address,
231 moat_port = moat.port,
232 ));
233 (true, true)
234 }
235 (other_minio, other_s3, _, _) => {
236 return Err(anyhow!(
237 "{} minio and {} s3 instance found in config, but only 1 is needed",
238 other_minio.len(),
239 other_s3.len()
240 ));
241 }
242 };
243
244 Ok((is_shared_backend, is_persistent_backend))
245}