1use std::env;
16use std::path::Path;
17use std::process::Command;
18use std::sync::Once;
19
20use anyhow::{Result, anyhow};
21use itertools::Itertools;
22
23use super::ExecuteContext;
24use crate::util::is_env_set;
25use crate::{AwsS3Config, MetaNodeConfig, MinioConfig, MoatConfig, OpendalConfig, TempoConfig};
26
27impl<W> ExecuteContext<W>
28where
29 W: std::io::Write,
30{
31 pub fn risingwave_cmd(&mut self, component: &str) -> Result<Command> {
33 let mut cmd = if let Ok(tag) = env::var("USE_SYSTEM_RISINGWAVE")
34 && let Some(tag) = tag.strip_prefix("docker:")
35 {
36 let image = format!("risingwavelabs/risingwave:{}", tag);
37
38 self.pb
40 .set_message(format!("pulling docker image \"{image}\"..."));
41 static DOCKER_PULL: Once = Once::new();
42 DOCKER_PULL.call_once(|| {
43 let mut pull_cmd = Command::new("docker");
44 pull_cmd.arg("pull").arg(&image);
45 let output = pull_cmd.output().expect("Failed to pull docker image");
46 output.status.exit_ok().unwrap_or_else(|_| {
47 panic!(
48 "Failed to pull docker image: {}",
49 String::from_utf8_lossy(&output.stderr)
50 )
51 })
52 });
53
54 let wd = env::var("PREFIX")?; let name = format!("risedev-{}", self.id.as_ref().unwrap());
56
57 let mut cmd = Command::new("docker");
58 cmd.arg("run")
59 .arg("-it")
60 .arg("--rm")
61 .arg("--name")
62 .arg(&name)
63 .arg("--network")
64 .arg("host")
65 .arg("--cpus")
68 .arg("4")
69 .arg("--memory")
70 .arg("16g")
71 .arg("-v")
72 .arg(format!("{wd}:{wd}"))
73 .arg(&image)
74 .arg(component);
75 cmd
76 } else if is_env_set("USE_SYSTEM_RISINGWAVE") {
77 let mut cmd = Command::new("risingwave");
78 cmd.arg(component);
79 cmd
80 } else {
81 let prefix_bin = std::env::var("PREFIX_BIN")?;
82 let path = Path::new(&prefix_bin).join("risingwave").join(component);
83 Command::new(path)
84 };
85
86 if crate::util::is_enable_backtrace() {
87 cmd.env("RUST_BACKTRACE", "1");
88 }
89
90 if crate::util::is_env_set("ENABLE_BUILD_RW_CONNECTOR") {
91 let prefix_bin = env::var("PREFIX_BIN")?;
92 cmd.env(
93 "CONNECTOR_LIBS_PATH",
94 Path::new(&prefix_bin).join("connector-node/libs/"),
95 );
96 }
97
98 let prefix_config = env::var("PREFIX_CONFIG")?;
99 cmd.arg("--config-path")
100 .arg(Path::new(&prefix_config).join("risingwave.toml"));
101
102 Ok(cmd)
103 }
104}
105
106pub fn add_meta_node(provide_meta_node: &[MetaNodeConfig], cmd: &mut Command) -> Result<()> {
108 match provide_meta_node {
109 [] => {
110 return Err(anyhow!(
111 "Cannot configure node: no meta node found in this configuration."
112 ));
113 }
114 meta_nodes => {
115 cmd.arg("--meta-address").arg(
116 meta_nodes
117 .iter()
118 .map(|meta_node| format!("http://{}:{}", meta_node.address, meta_node.port))
119 .join(","),
120 );
121 }
122 };
123
124 Ok(())
125}
126
127pub fn add_tempo_endpoint(provide_tempo: &[TempoConfig], cmd: &mut Command) -> Result<()> {
129 match provide_tempo {
130 [] => {}
131 [tempo] => {
132 cmd.env(
133 "RW_TRACING_ENDPOINT",
134 format!("http://{}:{}", tempo.address, tempo.otlp_port),
135 );
136 }
137 _ => {
138 return Err(anyhow!(
139 "{} Tempo instance found in config, but only 1 is needed",
140 provide_tempo.len()
141 ));
142 }
143 }
144
145 Ok(())
146}
147
148pub enum HummockInMemoryStrategy {
150 Allowed,
152 Disallowed,
154}
155
156pub fn add_hummock_backend(
158 id: &str,
159 provide_opendal: &[OpendalConfig],
160 provide_minio: &[MinioConfig],
161 provide_aws_s3: &[AwsS3Config],
162 provide_moat: &[MoatConfig],
163 hummock_in_memory_strategy: HummockInMemoryStrategy,
164 cmd: &mut Command,
165) -> Result<(bool, bool)> {
166 let (is_shared_backend, is_persistent_backend) = match (
167 provide_minio,
168 provide_aws_s3,
169 provide_opendal,
170 provide_moat,
171 ) {
172 ([], [], [], []) => match hummock_in_memory_strategy {
173 HummockInMemoryStrategy::Allowed => {
174 cmd.arg("--state-store").arg("hummock+memory");
175 (false, false)
176 }
177 HummockInMemoryStrategy::Disallowed => {
178 return Err(anyhow!(
179 "{} is not compatible with in-memory state backend. Need to enable either minio or aws-s3.",
180 id
181 ));
182 }
183 },
184 ([minio], [], [], []) => {
185 cmd.arg("--state-store").arg(format!(
186 "hummock+minio://{hummock_user}:{hummock_password}@{minio_addr}:{minio_port}/{hummock_bucket}",
187 hummock_user = minio.root_user,
188 hummock_password = minio.root_password,
189 hummock_bucket = minio.hummock_bucket,
190 minio_addr = minio.address,
191 minio_port = minio.port,
192 ));
193 (true, true)
194 }
195 ([], [aws_s3], [], []) => {
196 cmd.arg("--state-store")
197 .arg(format!("hummock+s3://{}", aws_s3.bucket));
198 (true, true)
199 }
200 ([], [], [opendal], []) => {
201 if opendal.engine == "hdfs" {
202 cmd.arg("--state-store")
203 .arg(format!("hummock+hdfs://{}", opendal.namenode));
204 } else if opendal.engine == "gcs" {
205 cmd.arg("--state-store")
206 .arg(format!("hummock+gcs://{}", opendal.bucket));
207 } else if opendal.engine == "obs" {
208 cmd.arg("--state-store")
209 .arg(format!("hummock+obs://{}", opendal.bucket));
210 } else if opendal.engine == "oss" {
211 cmd.arg("--state-store")
212 .arg(format!("hummock+oss://{}", opendal.bucket));
213 } else if opendal.engine == "webhdfs" {
214 cmd.arg("--state-store")
215 .arg(format!("hummock+webhdfs://{}", opendal.namenode));
216 } else if opendal.engine == "azblob" {
217 cmd.arg("--state-store")
218 .arg(format!("hummock+azblob://{}", opendal.bucket));
219 } else if opendal.engine == "fs" {
220 println!("using fs engine xxxx");
221 cmd.arg("--state-store")
222 .arg(format!("hummock+fs://{}", opendal.bucket));
223 } else {
224 unimplemented!()
225 }
226 (true, true)
227 }
228 ([minio], _, _, [moat]) => {
229 cmd.arg("--state-store").arg(format!(
230 "hummock+minio://{hummock_user}:{hummock_password}@{moat_addr}:{moat_port}/{hummock_bucket}",
231 hummock_user = minio.root_user,
232 hummock_password = minio.root_password,
233 hummock_bucket = minio.hummock_bucket,
234 moat_addr = moat.address,
235 moat_port = moat.port,
236 ));
237 (true, true)
238 }
239 (other_minio, other_s3, _, _) => {
240 return Err(anyhow!(
241 "{} minio and {} s3 instance found in config, but only 1 is needed",
242 other_minio.len(),
243 other_s3.len()
244 ));
245 }
246 };
247
248 Ok((is_shared_backend, is_persistent_backend))
249}