1use std::env;
16use std::path::Path;
17use std::process::Command;
18use std::sync::Once;
19
20use anyhow::{Result, anyhow};
21use itertools::Itertools;
22
23use super::ExecuteContext;
24use crate::util::is_env_set;
25use crate::{AwsS3Config, MetaNodeConfig, MinioConfig, OpendalConfig, TempoConfig};
26
27impl<W> ExecuteContext<W>
28where
29 W: std::io::Write,
30{
31 pub fn risingwave_cmd(&mut self, component: &str) -> Result<Command> {
33 let mut cmd = if let Ok(tag) = env::var("USE_SYSTEM_RISINGWAVE")
34 && let Some(tag) = tag.strip_prefix("docker:")
35 {
36 let image = format!("risingwavelabs/risingwave:{}", tag);
37
38 self.pb
40 .set_message(format!("pulling docker image \"{image}\"..."));
41 static DOCKER_PULL: Once = Once::new();
42 DOCKER_PULL.call_once(|| {
43 let mut pull_cmd = Command::new("docker");
44 pull_cmd.arg("pull").arg(&image);
45 let output = pull_cmd.output().expect("Failed to pull docker image");
46 output.status.exit_ok().unwrap_or_else(|_| {
47 panic!(
48 "Failed to pull docker image: {}",
49 String::from_utf8_lossy(&output.stderr)
50 )
51 })
52 });
53
54 let wd = env::var("PREFIX")?; let name = format!("risedev-{}", self.id.as_ref().unwrap());
56
57 let mut cmd = Command::new("docker");
58 cmd.arg("run")
59 .arg("-it")
60 .arg("--rm")
61 .arg("--name")
62 .arg(&name)
63 .arg("--network")
64 .arg("host")
65 .arg("--cpus")
66 .arg("4") .arg("-v")
68 .arg(format!("{wd}:{wd}"))
69 .arg(&image)
70 .arg(component);
71 cmd
72 } else if is_env_set("USE_SYSTEM_RISINGWAVE") {
73 let mut cmd = Command::new("risingwave");
74 cmd.arg(component);
75 cmd
76 } else {
77 let prefix_bin = std::env::var("PREFIX_BIN")?;
78 let path = Path::new(&prefix_bin).join("risingwave").join(component);
79 Command::new(path)
80 };
81
82 if crate::util::is_enable_backtrace() {
83 cmd.env("RUST_BACKTRACE", "1");
84 }
85
86 if crate::util::is_env_set("ENABLE_BUILD_RW_CONNECTOR") {
87 let prefix_bin = env::var("PREFIX_BIN")?;
88 cmd.env(
89 "CONNECTOR_LIBS_PATH",
90 Path::new(&prefix_bin).join("connector-node/libs/"),
91 );
92 }
93
94 let prefix_config = env::var("PREFIX_CONFIG")?;
95 cmd.arg("--config-path")
96 .arg(Path::new(&prefix_config).join("risingwave.toml"));
97
98 Ok(cmd)
99 }
100}
101
102pub fn add_meta_node(provide_meta_node: &[MetaNodeConfig], cmd: &mut Command) -> Result<()> {
104 match provide_meta_node {
105 [] => {
106 return Err(anyhow!(
107 "Cannot configure node: no meta node found in this configuration."
108 ));
109 }
110 meta_nodes => {
111 cmd.arg("--meta-address").arg(
112 meta_nodes
113 .iter()
114 .map(|meta_node| format!("http://{}:{}", meta_node.address, meta_node.port))
115 .join(","),
116 );
117 }
118 };
119
120 Ok(())
121}
122
123pub fn add_tempo_endpoint(provide_tempo: &[TempoConfig], cmd: &mut Command) -> Result<()> {
125 match provide_tempo {
126 [] => {}
127 [tempo] => {
128 cmd.env(
129 "RW_TRACING_ENDPOINT",
130 format!("http://{}:{}", tempo.address, tempo.otlp_port),
131 );
132 }
133 _ => {
134 return Err(anyhow!(
135 "{} Tempo instance found in config, but only 1 is needed",
136 provide_tempo.len()
137 ));
138 }
139 }
140
141 Ok(())
142}
143
144pub enum HummockInMemoryStrategy {
146 Allowed,
148 Disallowed,
150}
151
152pub fn add_hummock_backend(
154 id: &str,
155 provide_opendal: &[OpendalConfig],
156 provide_minio: &[MinioConfig],
157 provide_aws_s3: &[AwsS3Config],
158 hummock_in_memory_strategy: HummockInMemoryStrategy,
159 cmd: &mut Command,
160) -> Result<(bool, bool)> {
161 let (is_shared_backend, is_persistent_backend) = match (
162 provide_minio,
163 provide_aws_s3,
164 provide_opendal,
165 ) {
166 ([], [], []) => match hummock_in_memory_strategy {
167 HummockInMemoryStrategy::Allowed => {
168 cmd.arg("--state-store").arg("hummock+memory");
169 (false, false)
170 }
171 HummockInMemoryStrategy::Disallowed => {
172 return Err(anyhow!(
173 "{} is not compatible with in-memory state backend. Need to enable either minio or aws-s3.",
174 id
175 ));
176 }
177 },
178 ([minio], [], []) => {
179 cmd.arg("--state-store").arg(format!(
180 "hummock+minio://{hummock_user}:{hummock_password}@{minio_addr}:{minio_port}/{hummock_bucket}",
181 hummock_user = minio.root_user,
182 hummock_password = minio.root_password,
183 hummock_bucket = minio.hummock_bucket,
184 minio_addr = minio.address,
185 minio_port = minio.port,
186 ));
187 (true, true)
188 }
189 ([], [aws_s3], []) => {
190 cmd.arg("--state-store")
191 .arg(format!("hummock+s3://{}", aws_s3.bucket));
192 (true, true)
193 }
194 ([], [], [opendal]) => {
195 if opendal.engine == "hdfs" {
196 cmd.arg("--state-store")
197 .arg(format!("hummock+hdfs://{}", opendal.namenode));
198 } else if opendal.engine == "gcs" {
199 cmd.arg("--state-store")
200 .arg(format!("hummock+gcs://{}", opendal.bucket));
201 } else if opendal.engine == "obs" {
202 cmd.arg("--state-store")
203 .arg(format!("hummock+obs://{}", opendal.bucket));
204 } else if opendal.engine == "oss" {
205 cmd.arg("--state-store")
206 .arg(format!("hummock+oss://{}", opendal.bucket));
207 } else if opendal.engine == "webhdfs" {
208 cmd.arg("--state-store")
209 .arg(format!("hummock+webhdfs://{}", opendal.namenode));
210 } else if opendal.engine == "azblob" {
211 cmd.arg("--state-store")
212 .arg(format!("hummock+azblob://{}", opendal.bucket));
213 } else if opendal.engine == "fs" {
214 println!("using fs engine xxxx");
215 cmd.arg("--state-store")
216 .arg(format!("hummock+fs://{}", opendal.bucket));
217 } else {
218 unimplemented!()
219 }
220 (true, true)
221 }
222
223 (other_minio, other_s3, _) => {
224 return Err(anyhow!(
225 "{} minio and {} s3 instance found in config, but only 1 is needed",
226 other_minio.len(),
227 other_s3.len()
228 ));
229 }
230 };
231
232 Ok((is_shared_backend, is_persistent_backend))
233}