1use std::collections::BTreeMap;
18use std::path::Path;
19use std::process::Command;
20
21use anyhow::{Result, anyhow};
22use serde::{Deserialize, Serialize};
23
24use crate::{
25 CompactorConfig, CompactorService, ComputeNodeConfig, ComputeNodeService, FrontendConfig,
26 FrontendService, GrafanaConfig, GrafanaGen, HummockInMemoryStrategy, MetaNodeConfig,
27 MetaNodeService, MinioConfig, MinioService, PrometheusConfig, PrometheusGen, PrometheusService,
28 RedPandaConfig, TempoConfig, TempoGen, TempoService,
29};
30
31#[serde_with::skip_serializing_none]
32#[derive(Debug, Clone, Serialize, Default)]
33pub struct ComposeService {
34 pub image: String,
35 pub command: Vec<String>,
36 pub expose: Vec<String>,
37 pub ports: Vec<String>,
38 pub depends_on: Vec<String>,
39 pub volumes: Vec<String>,
40 pub entrypoint: Option<String>,
41 pub environment: BTreeMap<String, String>,
42 pub user: Option<String>,
43 pub container_name: String,
44 pub network_mode: Option<String>,
45 pub healthcheck: Option<HealthCheck>,
46}
47
48#[derive(Debug, Clone, Serialize, Default)]
49pub struct HealthCheck {
50 test: Vec<String>,
51 interval: String,
52 timeout: String,
53 retries: usize,
54}
55
56#[derive(Debug, Clone, Serialize)]
57pub struct ComposeFile {
58 pub services: BTreeMap<String, ComposeService>,
59 pub volumes: BTreeMap<String, ComposeVolume>,
60 pub name: String,
61}
62
63#[derive(Debug, Clone, Deserialize)]
64#[serde(rename_all = "kebab-case")]
65#[serde(deny_unknown_fields)]
66pub struct DockerImageConfig {
67 pub risingwave: String,
68 pub prometheus: String,
69 pub grafana: String,
70 pub tempo: String,
71 pub minio: String,
72 pub redpanda: String,
73}
74
75#[derive(Debug)]
76pub struct ComposeConfig {
77 pub image: DockerImageConfig,
79
80 pub config_directory: String,
83
84 pub rw_config_path: Option<String>,
86}
87
88#[derive(Debug, Clone, Serialize, Default)]
89pub struct ComposeVolume {
90 pub external: bool,
91}
92
93pub trait Compose {
95 fn compose(&self, config: &ComposeConfig) -> Result<ComposeService>;
96}
97
98fn get_cmd_args(cmd: &Command, with_argv_0: bool) -> Result<Vec<String>> {
99 let mut result = vec![];
100 if with_argv_0 {
101 result.push(
102 cmd.get_program()
103 .to_str()
104 .ok_or_else(|| anyhow!("Cannot convert to UTF-8 string"))?
105 .to_owned(),
106 );
107 }
108 for arg in cmd.get_args() {
109 result.push(
110 arg.to_str()
111 .ok_or_else(|| anyhow!("Cannot convert to UTF-8 string"))?
112 .to_owned(),
113 );
114 }
115 Ok(result)
116}
117
118fn get_cmd_envs(cmd: &Command, rust_backtrace: bool) -> Result<BTreeMap<String, String>> {
119 let mut result = BTreeMap::new();
120 if rust_backtrace {
121 result.insert("RUST_BACKTRACE".to_owned(), "1".to_owned());
122 }
123
124 for (k, v) in cmd.get_envs() {
125 let k = k
126 .to_str()
127 .ok_or_else(|| anyhow!("Cannot convert to UTF-8 string"))?
128 .to_owned();
129 let v = if let Some(v) = v {
130 Some(
131 v.to_str()
132 .ok_or_else(|| anyhow!("Cannot convert to UTF-8 string"))?
133 .to_owned(),
134 )
135 } else {
136 None
137 };
138 result.insert(k, v.unwrap_or_default());
139 }
140 Ok(result)
141}
142
143fn health_check_port(port: u16) -> HealthCheck {
144 HealthCheck {
145 test: vec![
146 "CMD-SHELL".into(),
147 format!(
148 "bash -c 'printf \"GET / HTTP/1.1\\n\\n\" > /dev/tcp/127.0.0.1/{}; exit $?;'",
149 port
150 ),
151 ],
152 interval: "1s".to_owned(),
153 timeout: "5s".to_owned(),
154 retries: 5,
155 }
156}
157
158fn health_check_port_prometheus(port: u16) -> HealthCheck {
159 HealthCheck {
160 test: vec![
161 "CMD-SHELL".into(),
162 format!(
163 "sh -c 'printf \"GET /-/healthy HTTP/1.0\\n\\n\" | nc localhost {}; exit $?;'",
164 port
165 ),
166 ],
167 interval: "1s".to_owned(),
168 timeout: "5s".to_owned(),
169 retries: 5,
170 }
171}
172
173impl Compose for ComputeNodeConfig {
174 fn compose(&self, config: &ComposeConfig) -> Result<ComposeService> {
175 let mut command = Command::new("compute-node");
176 ComputeNodeService::apply_command_args(&mut command, self)?;
177 if self.enable_tiered_cache {
178 command.arg("--data-file-cache-dir").arg("/foyer/data");
179 command.arg("--meta-file-cache-dir").arg("/foyer/meta");
180 }
181
182 if let Some(c) = &config.rw_config_path {
183 let target = Path::new(&config.config_directory).join("risingwave.toml");
184 fs_err::copy(c, target)?;
185 command.arg("--config-path").arg("/risingwave.toml");
186 }
187
188 let environment = get_cmd_envs(&command, true)?;
189 let command = get_cmd_args(&command, true)?;
190
191 let provide_meta_node = self.provide_meta_node.as_ref().unwrap();
192 let provide_minio = self.provide_minio.as_ref().unwrap();
193
194 Ok(ComposeService {
195 image: config.image.risingwave.clone(),
196 environment,
197 volumes: [
198 "./risingwave.toml:/risingwave.toml".to_owned(),
199 format!("{}:/filecache", self.id),
200 ]
201 .into_iter()
202 .collect(),
203 command,
204 expose: vec![self.port.to_string(), self.exporter_port.to_string()],
205 depends_on: provide_meta_node
206 .iter()
207 .map(|x| x.id.clone())
208 .chain(provide_minio.iter().map(|x| x.id.clone()))
209 .collect(),
210 healthcheck: Some(health_check_port(self.port)),
211 ..Default::default()
212 })
213 }
214}
215
216impl Compose for MetaNodeConfig {
217 fn compose(&self, config: &ComposeConfig) -> Result<ComposeService> {
218 let mut command = Command::new("meta-node");
219 MetaNodeService::apply_command_args(
220 &mut command,
221 self,
222 HummockInMemoryStrategy::Disallowed,
223 )?;
224
225 if let Some(c) = &config.rw_config_path {
226 let target = Path::new(&config.config_directory).join("risingwave.toml");
227 fs_err::copy(c, target)?;
228 command.arg("--config-path").arg("/risingwave.toml");
229 }
230
231 let environment = get_cmd_envs(&command, true)?;
232 let command = get_cmd_args(&command, true)?;
233
234 Ok(ComposeService {
235 image: config.image.risingwave.clone(),
236 environment,
237 volumes: ["./risingwave.toml:/risingwave.toml".to_owned()]
238 .into_iter()
239 .collect(),
240 command,
241 expose: vec![
242 self.port.to_string(),
243 self.exporter_port.to_string(),
244 self.dashboard_port.to_string(),
245 ],
246 healthcheck: Some(health_check_port(self.port)),
247 ..Default::default()
248 })
249 }
250}
251
252impl Compose for FrontendConfig {
253 fn compose(&self, config: &ComposeConfig) -> Result<ComposeService> {
254 let mut command = Command::new("frontend-node");
255 FrontendService::apply_command_args(&mut command, self)?;
256 let provide_meta_node = self.provide_meta_node.as_ref().unwrap();
257
258 if let Some(c) = &config.rw_config_path {
259 let target = Path::new(&config.config_directory).join("risingwave.toml");
260 fs_err::copy(c, target)?;
261 command.arg("--config-path").arg("/risingwave.toml");
262 }
263
264 let environment = get_cmd_envs(&command, true)?;
265 let command = get_cmd_args(&command, true)?;
266
267 Ok(ComposeService {
268 image: config.image.risingwave.clone(),
269 environment,
270 volumes: ["./risingwave.toml:/risingwave.toml".to_owned()]
271 .into_iter()
272 .collect(),
273 command,
274 ports: vec![format!("{}:{}", self.port, self.port)],
275 expose: vec![self.port.to_string()],
276 depends_on: provide_meta_node.iter().map(|x| x.id.clone()).collect(),
277 healthcheck: Some(health_check_port(self.port)),
278 ..Default::default()
279 })
280 }
281}
282
283impl Compose for CompactorConfig {
284 fn compose(&self, config: &ComposeConfig) -> Result<ComposeService> {
285 let mut command = Command::new("compactor-node");
286 CompactorService::apply_command_args(&mut command, self)?;
287
288 if let Some(c) = &config.rw_config_path {
289 let target = Path::new(&config.config_directory).join("risingwave.toml");
290 fs_err::copy(c, target)?;
291 command.arg("--config-path").arg("/risingwave.toml");
292 }
293
294 let provide_meta_node = self.provide_meta_node.as_ref().unwrap();
295 let provide_minio = self.provide_minio.as_ref().unwrap();
296
297 let environment = get_cmd_envs(&command, true)?;
298 let command = get_cmd_args(&command, true)?;
299
300 Ok(ComposeService {
301 image: config.image.risingwave.clone(),
302 environment,
303 volumes: ["./risingwave.toml:/risingwave.toml".to_owned()]
304 .into_iter()
305 .collect(),
306 command,
307 expose: vec![self.port.to_string(), self.exporter_port.to_string()],
308 depends_on: provide_meta_node
309 .iter()
310 .map(|x| x.id.clone())
311 .chain(provide_minio.iter().map(|x| x.id.clone()))
312 .collect(),
313 healthcheck: Some(health_check_port(self.port)),
314 ..Default::default()
315 })
316 }
317}
318
319impl Compose for MinioConfig {
320 fn compose(&self, config: &ComposeConfig) -> Result<ComposeService> {
321 let mut command = Command::new("minio");
322 MinioService::apply_command_args(&mut command, self)?;
323 command.arg("/data");
324
325 let env = get_cmd_envs(&command, false)?;
326 let command = get_cmd_args(&command, false)?;
327 let bucket_name = &self.hummock_bucket;
328
329 let entrypoint = format!(
330 r#"
331/bin/sh -c '
332set -e
333mkdir -p "/data/{bucket_name}"
334/usr/bin/docker-entrypoint.sh "$$0" "$$@"
335'"#
336 );
337
338 Ok(ComposeService {
339 image: config.image.minio.clone(),
340 command,
341 environment: env,
342 entrypoint: Some(entrypoint),
343 ports: vec![
344 format!("{}:{}", self.port, self.port),
345 format!("{}:{}", self.console_port, self.console_port),
346 ],
347 volumes: vec![format!("{}:/data", self.id)],
348 expose: vec![self.port.to_string(), self.console_port.to_string()],
349 healthcheck: Some(health_check_port(self.port)),
350 ..Default::default()
351 })
352 }
353}
354
355impl Compose for RedPandaConfig {
356 fn compose(&self, config: &ComposeConfig) -> Result<ComposeService> {
357 let mut command = Command::new("redpanda");
358
359 command
360 .arg("start")
361 .arg("--smp")
362 .arg(self.cpus.to_string())
363 .arg("--reserve-memory")
364 .arg("0")
365 .arg("--memory")
366 .arg(&self.memory)
367 .arg("--overprovisioned")
368 .arg("--node-id")
369 .arg("0")
370 .arg("--check=false");
371
372 command.arg("--kafka-addr").arg(format!(
373 "PLAINTEXT://0.0.0.0:{},OUTSIDE://0.0.0.0:{}",
374 self.internal_port, self.outside_port
375 ));
376
377 command.arg("--advertise-kafka-addr").arg(format!(
378 "PLAINTEXT://{}:{},OUTSIDE://localhost:{}",
379 self.address, self.internal_port, self.outside_port
380 ));
381
382 let command = get_cmd_args(&command, true)?;
383
384 Ok(ComposeService {
385 image: config.image.redpanda.clone(),
386 command,
387 expose: vec![
388 self.internal_port.to_string(),
389 self.outside_port.to_string(),
390 ],
391 volumes: vec![format!("{}:/var/lib/redpanda/data", self.id)],
392 ports: vec![
393 format!("{}:{}", self.outside_port, self.outside_port),
394 "9644:9644".to_owned(),
396 ],
397 healthcheck: Some(health_check_port(self.outside_port)),
398 ..Default::default()
399 })
400 }
401}
402
403impl Compose for PrometheusConfig {
404 fn compose(&self, config: &ComposeConfig) -> Result<ComposeService> {
405 let mut command = Command::new("prometheus");
406 command
407 .arg("--config.file=/etc/prometheus/prometheus.yml")
408 .arg("--storage.tsdb.path=/prometheus")
409 .arg("--web.console.libraries=/usr/share/prometheus/console_libraries")
410 .arg("--web.console.templates=/usr/share/prometheus/consoles");
411 PrometheusService::apply_command_args(&mut command, self)?;
412 let command = get_cmd_args(&command, false)?;
413
414 let prometheus_config = PrometheusGen.gen_prometheus_yml(self);
415
416 let mut service = ComposeService {
417 image: config.image.prometheus.clone(),
418 command,
419 expose: vec![self.port.to_string()],
420 ports: vec![format!("{}:{}", self.port, self.port)],
421 volumes: vec![format!("{}:/prometheus", self.id)],
422 healthcheck: Some(health_check_port_prometheus(self.port)),
423 ..Default::default()
424 };
425
426 fs_err::write(
427 Path::new(&config.config_directory).join("prometheus.yaml"),
428 prometheus_config,
429 )?;
430 service
431 .volumes
432 .push("./prometheus.yaml:/etc/prometheus/prometheus.yml".into());
433
434 Ok(service)
435 }
436}
437
438impl Compose for GrafanaConfig {
439 fn compose(&self, config: &ComposeConfig) -> Result<ComposeService> {
440 let config_root = Path::new(&config.config_directory);
441 fs_err::write(
442 config_root.join("grafana.ini"),
443 GrafanaGen.gen_custom_ini(self),
444 )?;
445
446 fs_err::write(
447 config_root.join("risedev-prometheus.yml"),
448 GrafanaGen.gen_prometheus_datasource_yml(self)?,
449 )?;
450
451 fs_err::write(
452 config_root.join("risedev-dashboard.yml"),
453 GrafanaGen.gen_dashboard_yml(self, config_root, "/")?,
454 )?;
455
456 let mut service = ComposeService {
457 image: config.image.grafana.clone(),
458 expose: vec![self.port.to_string()],
459 ports: vec![format!("{}:{}", self.port, self.port)],
460 volumes: vec![
461 format!("{}:/var/lib/grafana", self.id),
462 "./grafana.ini:/etc/grafana/grafana.ini".to_owned(),
463 "./risedev-prometheus.yml:/etc/grafana/provisioning/datasources/risedev-prometheus.yml".to_owned(),
464 "./risedev-dashboard.yml:/etc/grafana/provisioning/dashboards/risedev-dashboard.yml".to_owned(),
465 "./risingwave-dashboard.json:/risingwave-dashboard.json".to_owned()
466 ],
467 healthcheck: Some(health_check_port(self.port)),
468 ..Default::default()
469 };
470
471 if !self.provide_tempo.as_ref().unwrap().is_empty() {
472 fs_err::write(
473 config_root.join("risedev-tempo.yml"),
474 GrafanaGen.gen_tempo_datasource_yml(self)?,
475 )?;
476 service.volumes.push(
477 "./risedev-tempo.yml:/etc/grafana/provisioning/datasources/risedev-tempo.yml"
478 .to_owned(),
479 );
480 }
481
482 Ok(service)
483 }
484}
485
486impl Compose for TempoConfig {
487 fn compose(&self, config: &ComposeConfig) -> Result<ComposeService> {
488 let mut command = Command::new("tempo");
489 TempoService::apply_command_args(&mut command, "/tmp/tempo", "/etc/tempo.yaml")?;
490 let command = get_cmd_args(&command, false)?;
491
492 let config_root = Path::new(&config.config_directory);
493 let config_file_path = config_root.join("tempo.yaml");
494 fs_err::write(config_file_path, TempoGen.gen_tempo_yml(self))?;
495
496 let service = ComposeService {
497 image: config.image.tempo.clone(),
498 command,
499 expose: vec![self.port.to_string(), self.otlp_port.to_string()],
500 ports: vec![format!("{}:{}", self.port, self.port)],
501 volumes: vec![format!("./tempo.yaml:/etc/tempo.yaml")],
502 ..Default::default()
503 };
504
505 Ok(service)
506 }
507}