risedev/
compose.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Generate docker compose yaml files for risedev components.
16
17use std::collections::BTreeMap;
18use std::path::Path;
19use std::process::Command;
20
21use anyhow::{Result, anyhow};
22use serde::{Deserialize, Serialize};
23
24use crate::{
25    CompactorConfig, CompactorService, ComputeNodeConfig, ComputeNodeService, FrontendConfig,
26    FrontendService, GrafanaConfig, GrafanaGen, HummockInMemoryStrategy, MetaNodeConfig,
27    MetaNodeService, MinioConfig, MinioService, PrometheusConfig, PrometheusGen, PrometheusService,
28    RedPandaConfig, TempoConfig, TempoGen, TempoService,
29};
30
31#[serde_with::skip_serializing_none]
32#[derive(Debug, Clone, Serialize, Default)]
33pub struct ComposeService {
34    pub image: String,
35    pub command: Vec<String>,
36    pub expose: Vec<String>,
37    pub ports: Vec<String>,
38    pub depends_on: Vec<String>,
39    pub volumes: Vec<String>,
40    pub entrypoint: Option<String>,
41    pub environment: BTreeMap<String, String>,
42    pub user: Option<String>,
43    pub container_name: String,
44    pub network_mode: Option<String>,
45    pub healthcheck: Option<HealthCheck>,
46}
47
48#[derive(Debug, Clone, Serialize, Default)]
49pub struct HealthCheck {
50    test: Vec<String>,
51    interval: String,
52    timeout: String,
53    retries: usize,
54}
55
56#[derive(Debug, Clone, Serialize)]
57pub struct ComposeFile {
58    pub services: BTreeMap<String, ComposeService>,
59    pub volumes: BTreeMap<String, ComposeVolume>,
60    pub name: String,
61}
62
63#[derive(Debug, Clone, Deserialize)]
64#[serde(rename_all = "kebab-case")]
65#[serde(deny_unknown_fields)]
66pub struct DockerImageConfig {
67    pub risingwave: String,
68    pub prometheus: String,
69    pub grafana: String,
70    pub tempo: String,
71    pub minio: String,
72    pub redpanda: String,
73}
74
75#[derive(Debug)]
76pub struct ComposeConfig {
77    /// Docker compose image config
78    pub image: DockerImageConfig,
79
80    /// The directory to output all configs. If disabled, all config files will be embedded into
81    /// the docker-compose file.
82    pub config_directory: String,
83
84    /// The path of `risingwave.toml`
85    pub rw_config_path: Option<String>,
86}
87
88#[derive(Debug, Clone, Serialize, Default)]
89pub struct ComposeVolume {
90    pub external: bool,
91}
92
93/// Generate compose yaml files for a component.
94pub trait Compose {
95    fn compose(&self, config: &ComposeConfig) -> Result<ComposeService>;
96}
97
98fn get_cmd_args(cmd: &Command, with_argv_0: bool) -> Result<Vec<String>> {
99    let mut result = vec![];
100    if with_argv_0 {
101        result.push(
102            cmd.get_program()
103                .to_str()
104                .ok_or_else(|| anyhow!("Cannot convert to UTF-8 string"))?
105                .to_owned(),
106        );
107    }
108    for arg in cmd.get_args() {
109        result.push(
110            arg.to_str()
111                .ok_or_else(|| anyhow!("Cannot convert to UTF-8 string"))?
112                .to_owned(),
113        );
114    }
115    Ok(result)
116}
117
118fn get_cmd_envs(cmd: &Command, rust_backtrace: bool) -> Result<BTreeMap<String, String>> {
119    let mut result = BTreeMap::new();
120    if rust_backtrace {
121        result.insert("RUST_BACKTRACE".to_owned(), "1".to_owned());
122    }
123
124    for (k, v) in cmd.get_envs() {
125        let k = k
126            .to_str()
127            .ok_or_else(|| anyhow!("Cannot convert to UTF-8 string"))?
128            .to_owned();
129        let v = if let Some(v) = v {
130            Some(
131                v.to_str()
132                    .ok_or_else(|| anyhow!("Cannot convert to UTF-8 string"))?
133                    .to_owned(),
134            )
135        } else {
136            None
137        };
138        result.insert(k, v.unwrap_or_default());
139    }
140    Ok(result)
141}
142
143fn health_check_port(port: u16) -> HealthCheck {
144    HealthCheck {
145        test: vec![
146            "CMD-SHELL".into(),
147            format!(
148                "bash -c 'printf \"GET / HTTP/1.1\\n\\n\" > /dev/tcp/127.0.0.1/{}; exit $?;'",
149                port
150            ),
151        ],
152        interval: "1s".to_owned(),
153        timeout: "5s".to_owned(),
154        retries: 5,
155    }
156}
157
158fn health_check_port_prometheus(port: u16) -> HealthCheck {
159    HealthCheck {
160        test: vec![
161            "CMD-SHELL".into(),
162            format!(
163                "sh -c 'printf \"GET /-/healthy HTTP/1.0\\n\\n\" | nc localhost {}; exit $?;'",
164                port
165            ),
166        ],
167        interval: "1s".to_owned(),
168        timeout: "5s".to_owned(),
169        retries: 5,
170    }
171}
172
173impl Compose for ComputeNodeConfig {
174    fn compose(&self, config: &ComposeConfig) -> Result<ComposeService> {
175        let mut command = Command::new("compute-node");
176        ComputeNodeService::apply_command_args(&mut command, self)?;
177        if self.enable_tiered_cache {
178            command.arg("--data-file-cache-dir").arg("/foyer/data");
179            command.arg("--meta-file-cache-dir").arg("/foyer/meta");
180        }
181
182        if let Some(c) = &config.rw_config_path {
183            let target = Path::new(&config.config_directory).join("risingwave.toml");
184            fs_err::copy(c, target)?;
185            command.arg("--config-path").arg("/risingwave.toml");
186        }
187
188        let environment = get_cmd_envs(&command, true)?;
189        let command = get_cmd_args(&command, true)?;
190
191        let provide_meta_node = self.provide_meta_node.as_ref().unwrap();
192        let provide_minio = self.provide_minio.as_ref().unwrap();
193
194        Ok(ComposeService {
195            image: config.image.risingwave.clone(),
196            environment,
197            volumes: [
198                "./risingwave.toml:/risingwave.toml".to_owned(),
199                format!("{}:/filecache", self.id),
200            ]
201            .into_iter()
202            .collect(),
203            command,
204            expose: vec![self.port.to_string(), self.exporter_port.to_string()],
205            depends_on: provide_meta_node
206                .iter()
207                .map(|x| x.id.clone())
208                .chain(provide_minio.iter().map(|x| x.id.clone()))
209                .collect(),
210            healthcheck: Some(health_check_port(self.port)),
211            ..Default::default()
212        })
213    }
214}
215
216impl Compose for MetaNodeConfig {
217    fn compose(&self, config: &ComposeConfig) -> Result<ComposeService> {
218        let mut command = Command::new("meta-node");
219        MetaNodeService::apply_command_args(
220            &mut command,
221            self,
222            HummockInMemoryStrategy::Disallowed,
223        )?;
224
225        if let Some(c) = &config.rw_config_path {
226            let target = Path::new(&config.config_directory).join("risingwave.toml");
227            fs_err::copy(c, target)?;
228            command.arg("--config-path").arg("/risingwave.toml");
229        }
230
231        let environment = get_cmd_envs(&command, true)?;
232        let command = get_cmd_args(&command, true)?;
233
234        Ok(ComposeService {
235            image: config.image.risingwave.clone(),
236            environment,
237            volumes: ["./risingwave.toml:/risingwave.toml".to_owned()]
238                .into_iter()
239                .collect(),
240            command,
241            expose: vec![
242                self.port.to_string(),
243                self.exporter_port.to_string(),
244                self.dashboard_port.to_string(),
245            ],
246            healthcheck: Some(health_check_port(self.port)),
247            ..Default::default()
248        })
249    }
250}
251
252impl Compose for FrontendConfig {
253    fn compose(&self, config: &ComposeConfig) -> Result<ComposeService> {
254        let mut command = Command::new("frontend-node");
255        FrontendService::apply_command_args(&mut command, self)?;
256        let provide_meta_node = self.provide_meta_node.as_ref().unwrap();
257
258        if let Some(c) = &config.rw_config_path {
259            let target = Path::new(&config.config_directory).join("risingwave.toml");
260            fs_err::copy(c, target)?;
261            command.arg("--config-path").arg("/risingwave.toml");
262        }
263
264        let environment = get_cmd_envs(&command, true)?;
265        let command = get_cmd_args(&command, true)?;
266
267        Ok(ComposeService {
268            image: config.image.risingwave.clone(),
269            environment,
270            volumes: ["./risingwave.toml:/risingwave.toml".to_owned()]
271                .into_iter()
272                .collect(),
273            command,
274            ports: vec![format!("{}:{}", self.port, self.port)],
275            expose: vec![self.port.to_string()],
276            depends_on: provide_meta_node.iter().map(|x| x.id.clone()).collect(),
277            healthcheck: Some(health_check_port(self.port)),
278            ..Default::default()
279        })
280    }
281}
282
283impl Compose for CompactorConfig {
284    fn compose(&self, config: &ComposeConfig) -> Result<ComposeService> {
285        let mut command = Command::new("compactor-node");
286        CompactorService::apply_command_args(&mut command, self)?;
287
288        if let Some(c) = &config.rw_config_path {
289            let target = Path::new(&config.config_directory).join("risingwave.toml");
290            fs_err::copy(c, target)?;
291            command.arg("--config-path").arg("/risingwave.toml");
292        }
293
294        let provide_meta_node = self.provide_meta_node.as_ref().unwrap();
295        let provide_minio = self.provide_minio.as_ref().unwrap();
296
297        let environment = get_cmd_envs(&command, true)?;
298        let command = get_cmd_args(&command, true)?;
299
300        Ok(ComposeService {
301            image: config.image.risingwave.clone(),
302            environment,
303            volumes: ["./risingwave.toml:/risingwave.toml".to_owned()]
304                .into_iter()
305                .collect(),
306            command,
307            expose: vec![self.port.to_string(), self.exporter_port.to_string()],
308            depends_on: provide_meta_node
309                .iter()
310                .map(|x| x.id.clone())
311                .chain(provide_minio.iter().map(|x| x.id.clone()))
312                .collect(),
313            healthcheck: Some(health_check_port(self.port)),
314            ..Default::default()
315        })
316    }
317}
318
319impl Compose for MinioConfig {
320    fn compose(&self, config: &ComposeConfig) -> Result<ComposeService> {
321        let mut command = Command::new("minio");
322        MinioService::apply_command_args(&mut command, self)?;
323        command.arg("/data");
324
325        let env = get_cmd_envs(&command, false)?;
326        let command = get_cmd_args(&command, false)?;
327        let bucket_name = &self.hummock_bucket;
328
329        let entrypoint = format!(
330            r#"
331/bin/sh -c '
332set -e
333mkdir -p "/data/{bucket_name}"
334/usr/bin/docker-entrypoint.sh "$$0" "$$@"
335'"#
336        );
337
338        Ok(ComposeService {
339            image: config.image.minio.clone(),
340            command,
341            environment: env,
342            entrypoint: Some(entrypoint),
343            ports: vec![
344                format!("{}:{}", self.port, self.port),
345                format!("{}:{}", self.console_port, self.console_port),
346            ],
347            volumes: vec![format!("{}:/data", self.id)],
348            expose: vec![self.port.to_string(), self.console_port.to_string()],
349            healthcheck: Some(health_check_port(self.port)),
350            ..Default::default()
351        })
352    }
353}
354
355impl Compose for RedPandaConfig {
356    fn compose(&self, config: &ComposeConfig) -> Result<ComposeService> {
357        let mut command = Command::new("redpanda");
358
359        command
360            .arg("start")
361            .arg("--smp")
362            .arg(self.cpus.to_string())
363            .arg("--reserve-memory")
364            .arg("0")
365            .arg("--memory")
366            .arg(&self.memory)
367            .arg("--overprovisioned")
368            .arg("--node-id")
369            .arg("0")
370            .arg("--check=false");
371
372        command.arg("--kafka-addr").arg(format!(
373            "PLAINTEXT://0.0.0.0:{},OUTSIDE://0.0.0.0:{}",
374            self.internal_port, self.outside_port
375        ));
376
377        command.arg("--advertise-kafka-addr").arg(format!(
378            "PLAINTEXT://{}:{},OUTSIDE://localhost:{}",
379            self.address, self.internal_port, self.outside_port
380        ));
381
382        let command = get_cmd_args(&command, true)?;
383
384        Ok(ComposeService {
385            image: config.image.redpanda.clone(),
386            command,
387            expose: vec![
388                self.internal_port.to_string(),
389                self.outside_port.to_string(),
390            ],
391            volumes: vec![format!("{}:/var/lib/redpanda/data", self.id)],
392            ports: vec![
393                format!("{}:{}", self.outside_port, self.outside_port),
394                // Redpanda admin port
395                "9644:9644".to_owned(),
396            ],
397            healthcheck: Some(health_check_port(self.outside_port)),
398            ..Default::default()
399        })
400    }
401}
402
403impl Compose for PrometheusConfig {
404    fn compose(&self, config: &ComposeConfig) -> Result<ComposeService> {
405        let mut command = Command::new("prometheus");
406        command
407            .arg("--config.file=/etc/prometheus/prometheus.yml")
408            .arg("--storage.tsdb.path=/prometheus")
409            .arg("--web.console.libraries=/usr/share/prometheus/console_libraries")
410            .arg("--web.console.templates=/usr/share/prometheus/consoles");
411        PrometheusService::apply_command_args(&mut command, self)?;
412        let command = get_cmd_args(&command, false)?;
413
414        let prometheus_config = PrometheusGen.gen_prometheus_yml(self);
415
416        let mut service = ComposeService {
417            image: config.image.prometheus.clone(),
418            command,
419            expose: vec![self.port.to_string()],
420            ports: vec![format!("{}:{}", self.port, self.port)],
421            volumes: vec![format!("{}:/prometheus", self.id)],
422            healthcheck: Some(health_check_port_prometheus(self.port)),
423            ..Default::default()
424        };
425
426        fs_err::write(
427            Path::new(&config.config_directory).join("prometheus.yaml"),
428            prometheus_config,
429        )?;
430        service
431            .volumes
432            .push("./prometheus.yaml:/etc/prometheus/prometheus.yml".into());
433
434        Ok(service)
435    }
436}
437
438impl Compose for GrafanaConfig {
439    fn compose(&self, config: &ComposeConfig) -> Result<ComposeService> {
440        let config_root = Path::new(&config.config_directory);
441        fs_err::write(
442            config_root.join("grafana.ini"),
443            GrafanaGen.gen_custom_ini(self),
444        )?;
445
446        fs_err::write(
447            config_root.join("risedev-prometheus.yml"),
448            GrafanaGen.gen_prometheus_datasource_yml(self)?,
449        )?;
450
451        fs_err::write(
452            config_root.join("risedev-dashboard.yml"),
453            GrafanaGen.gen_dashboard_yml(self, config_root, "/")?,
454        )?;
455
456        let mut service = ComposeService {
457            image: config.image.grafana.clone(),
458            expose: vec![self.port.to_string()],
459            ports: vec![format!("{}:{}", self.port, self.port)],
460            volumes: vec![
461                format!("{}:/var/lib/grafana", self.id),
462                "./grafana.ini:/etc/grafana/grafana.ini".to_owned(),
463                "./risedev-prometheus.yml:/etc/grafana/provisioning/datasources/risedev-prometheus.yml".to_owned(),
464                "./risedev-dashboard.yml:/etc/grafana/provisioning/dashboards/risedev-dashboard.yml".to_owned(),
465                "./risingwave-dashboard.json:/risingwave-dashboard.json".to_owned()
466            ],
467            healthcheck: Some(health_check_port(self.port)),
468            ..Default::default()
469        };
470
471        if !self.provide_tempo.as_ref().unwrap().is_empty() {
472            fs_err::write(
473                config_root.join("risedev-tempo.yml"),
474                GrafanaGen.gen_tempo_datasource_yml(self)?,
475            )?;
476            service.volumes.push(
477                "./risedev-tempo.yml:/etc/grafana/provisioning/datasources/risedev-tempo.yml"
478                    .to_owned(),
479            );
480        }
481
482        Ok(service)
483    }
484}
485
486impl Compose for TempoConfig {
487    fn compose(&self, config: &ComposeConfig) -> Result<ComposeService> {
488        let mut command = Command::new("tempo");
489        TempoService::apply_command_args(&mut command, "/tmp/tempo", "/etc/tempo.yaml")?;
490        let command = get_cmd_args(&command, false)?;
491
492        let config_root = Path::new(&config.config_directory);
493        let config_file_path = config_root.join("tempo.yaml");
494        fs_err::write(config_file_path, TempoGen.gen_tempo_yml(self))?;
495
496        let service = ComposeService {
497            image: config.image.tempo.clone(),
498            command,
499            expose: vec![self.port.to_string(), self.otlp_port.to_string()],
500            ports: vec![format!("{}:{}", self.port, self.port)],
501            volumes: vec![format!("./tempo.yaml:/etc/tempo.yaml")],
502            ..Default::default()
503        };
504
505        Ok(service)
506    }
507}