risedev_compose/
risedev-compose.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::io::Read;
17use std::path::Path;
18
19use anyhow::{Result, anyhow};
20use clap::Parser;
21use console::style;
22use fs_err::File;
23use itertools::Itertools;
24use risedev::{
25    Compose, ComposeConfig, ComposeDeployConfig, ComposeFile, ComposeService, ComposeVolume,
26    ConfigExpander, DockerImageConfig, RISEDEV_CONFIG_FILE, ServiceConfig, compose_deploy,
27    generate_risedev_env,
28};
29use serde::Deserialize;
30
31#[derive(Parser)]
32#[clap(author, version, about, long_about = None)]
33#[clap(propagate_version = true)]
34pub struct RiseDevComposeOpts {
35    #[clap(short, long)]
36    directory: String,
37
38    #[clap(default_value = "compose")]
39    profile: String,
40
41    /// Whether to generate deployment script. If enabled, network mode will be set to host, a
42    /// deploy.sh will be generated.
43    #[clap(long)]
44    deploy: bool,
45}
46
47fn load_docker_image_config(
48    risedev_config: &str,
49    override_risingwave_image: Option<&String>,
50) -> Result<DockerImageConfig> {
51    #[derive(Deserialize)]
52    struct ConfigInRiseDev {
53        compose: DockerImageConfig,
54    }
55    let mut config: ConfigInRiseDev = serde_yaml::from_str(risedev_config)?;
56    if let Some(override_risingwave_image) = override_risingwave_image {
57        config.compose.risingwave = override_risingwave_image.to_string();
58    }
59    Ok(config.compose)
60}
61
62fn main() -> Result<()> {
63    let opts = RiseDevComposeOpts::parse();
64
65    let (risedev_config, compose_deploy_config, rw_config_path) = if opts.deploy {
66        let compose_deploy_config = {
67            let mut content = String::new();
68            File::open("risedev-compose.yml")?.read_to_string(&mut content)?;
69            content
70        };
71        let compose_deploy_config: ComposeDeployConfig =
72            serde_yaml::from_str(&compose_deploy_config)?;
73        let extra_info = compose_deploy_config
74            .instances
75            .iter()
76            .map(|i| (format!("dns-host:{}", i.id), i.dns_host.clone()))
77            .chain(
78                compose_deploy_config
79                    .risedev_extra_args
80                    .iter()
81                    .map(|(k, v)| (k.clone(), v.clone())),
82            )
83            .collect();
84
85        let (config_path, _env, expanded_config) =
86            ConfigExpander::expand_with_extra_info(".", &opts.profile, extra_info)?;
87        (expanded_config, Some(compose_deploy_config), config_path)
88    } else {
89        let (config_path, _env, expanded_config) = ConfigExpander::expand(".", &opts.profile)?;
90        (expanded_config, None, config_path)
91    };
92
93    let compose_config = ComposeConfig {
94        image: load_docker_image_config(
95            &fs_err::read_to_string(RISEDEV_CONFIG_FILE)?,
96            compose_deploy_config
97                .as_ref()
98                .and_then(|x| x.risingwave_image_override.as_ref()),
99        )?,
100        config_directory: opts.directory.clone(),
101        rw_config_path,
102    };
103
104    let services = ConfigExpander::deserialize(&risedev_config)?;
105
106    let mut compose_services: BTreeMap<String, BTreeMap<String, ComposeService>> = BTreeMap::new();
107    let mut service_on_node: BTreeMap<String, String> = BTreeMap::new();
108    let mut volumes = BTreeMap::new();
109
110    let mut log_buffer = String::new();
111    use std::fmt::Write;
112
113    for service in &services {
114        let step = service.id();
115
116        let compose_deploy_config = compose_deploy_config.as_ref();
117        let (address, mut compose) = match service {
118            ServiceConfig::Minio(c) => {
119                volumes.insert(c.id.clone(), ComposeVolume::default());
120                (c.address.clone(), c.compose(&compose_config)?)
121            }
122            ServiceConfig::Sqlite(_) => continue,
123            ServiceConfig::Prometheus(c) => {
124                volumes.insert(c.id.clone(), ComposeVolume::default());
125                (c.address.clone(), c.compose(&compose_config)?)
126            }
127            ServiceConfig::ComputeNode(c) => {
128                volumes.insert(c.id.clone(), ComposeVolume::default());
129                (c.address.clone(), c.compose(&compose_config)?)
130            }
131            ServiceConfig::MetaNode(c) => {
132                if opts.deploy {
133                    let public_ip = &compose_deploy_config
134                        .unwrap()
135                        .lookup_instance_by_host(&c.address)
136                        .public_ip;
137                    writeln!(
138                        log_buffer,
139                        "-- Dashboard --\nuse VSCode to forward {} from {}\nor use {}\n",
140                        style(format!("{}", c.dashboard_port)).green(),
141                        style(format!("ubuntu@{}", public_ip)).green(),
142                        style(format!(
143                            "ssh -N -L {}:localhost:{} ubuntu@{}",
144                            c.dashboard_port, c.dashboard_port, public_ip
145                        ))
146                        .green()
147                    )?;
148                }
149                (c.address.clone(), c.compose(&compose_config)?)
150            }
151            ServiceConfig::Frontend(c) => {
152                if opts.deploy {
153                    let arg = format!("--frontend {} --frontend-port {}", c.address, c.port);
154                    writeln!(
155                        log_buffer,
156                        "-- Frontend --\nAccess inside cluster: {}\ntpch-bench args: {}\n",
157                        style(format!(
158                            "psql -d dev -h {} -p {} -U root",
159                            c.address, c.port
160                        ))
161                        .green(),
162                        style(&arg).green()
163                    )?;
164                    fs_err::write(
165                        Path::new(&opts.directory).join("tpch-bench-args-frontend"),
166                        arg,
167                    )?;
168                }
169                (c.address.clone(), c.compose(&compose_config)?)
170            }
171            ServiceConfig::Compactor(c) => (c.address.clone(), c.compose(&compose_config)?),
172            ServiceConfig::Grafana(c) => {
173                if opts.deploy {
174                    let public_ip = &compose_deploy_config
175                        .unwrap()
176                        .lookup_instance_by_host(&c.address)
177                        .public_ip;
178                    writeln!(
179                        log_buffer,
180                        "-- Grafana --\nuse VSCode to forward {} from {}\nor use {}\n",
181                        style(format!("{}", c.port)).green(),
182                        style(format!("ubuntu@{}", public_ip)).green(),
183                        style(format!(
184                            "ssh -N -L {}:localhost:{} ubuntu@{}",
185                            c.port, c.port, public_ip
186                        ))
187                        .green()
188                    )?;
189                }
190                volumes.insert(c.id.clone(), ComposeVolume::default());
191                (c.address.clone(), c.compose(&compose_config)?)
192            }
193            ServiceConfig::Tempo(c) => (c.address.clone(), c.compose(&compose_config)?),
194            ServiceConfig::Kafka(_) => {
195                return Err(anyhow!("not supported, please use redpanda instead"));
196            }
197            ServiceConfig::Pubsub(_) => {
198                return Err(anyhow!("not supported, please use redpanda instead"));
199            }
200            ServiceConfig::Opendal(_) => continue,
201            ServiceConfig::AwsS3(_) => continue,
202            ServiceConfig::RedPanda(c) => {
203                if opts.deploy {
204                    let arg = format!("--kafka-addr {}:{}", c.address, c.internal_port);
205                    writeln!(
206                        log_buffer,
207                        "-- Redpanda --\ntpch-bench: {}\n",
208                        style(&arg).green()
209                    )?;
210                    fs_err::write(
211                        Path::new(&opts.directory).join("tpch-bench-args-kafka"),
212                        arg,
213                    )?;
214                }
215                volumes.insert(c.id.clone(), ComposeVolume::default());
216                (c.address.clone(), c.compose(&compose_config)?)
217            }
218            ServiceConfig::Redis(_)
219            | ServiceConfig::MySql(_)
220            | ServiceConfig::Postgres(_)
221            | ServiceConfig::SqlServer(_)
222            | ServiceConfig::SchemaRegistry(_) => return Err(anyhow!("not supported")),
223        };
224        compose.container_name = service.id().to_owned();
225        if opts.deploy {
226            compose.network_mode = Some("host".into());
227            compose.depends_on = vec![];
228        }
229        compose_services
230            .entry(address.clone())
231            .or_default()
232            .insert(step.to_owned(), compose);
233        service_on_node.insert(step.to_owned(), address);
234    }
235
236    if opts.deploy {
237        for (node, services) in &compose_services {
238            let mut node_volumes = BTreeMap::new();
239            services.keys().for_each(|k| {
240                if let Some(v) = volumes.get(k) {
241                    node_volumes.insert(k.clone(), v.clone());
242                }
243            });
244            let compose_file = ComposeFile {
245                services: services.clone(),
246                volumes: node_volumes,
247                name: format!("risingwave-{}", opts.profile),
248            };
249
250            let yaml = serde_yaml::to_string(&compose_file)?;
251
252            let ec2_instance = compose_deploy_config
253                .as_ref()
254                .unwrap()
255                .lookup_instance_by_host(node);
256            if ec2_instance.r#type == "meta" {
257                let public_ip = &ec2_instance.public_ip;
258                writeln!(
259                    log_buffer,
260                    "-- Meta Node --\nLogin to meta node by {}\nor using VSCode {}\n",
261                    style(format!("ssh ubuntu@{}", public_ip)).green(),
262                    style(format!(
263                        "code --remote ssh-remote+ubuntu@{} <path>",
264                        public_ip
265                    ))
266                    .green()
267                )?;
268            }
269
270            fs_err::write(
271                Path::new(&opts.directory).join(format!("{}.yml", node)),
272                yaml,
273            )?;
274        }
275
276        if let Some(env) = Some(generate_risedev_env(&services)).filter(|x| !x.is_empty()) {
277            writeln!(log_buffer, "-- risedev-env --\n{}\n", style(env).green())?;
278        }
279
280        compose_deploy(
281            Path::new(&opts.directory),
282            &services.iter().map(|s| s.id().to_owned()).collect_vec(),
283            &compose_deploy_config.as_ref().unwrap().instances,
284            &compose_config,
285            &service_on_node,
286        )?;
287
288        println!("\n{}", log_buffer);
289
290        fs_err::write(
291            Path::new(&opts.directory).join("_message.partial.sh"),
292            log_buffer,
293        )?;
294    } else {
295        let mut services = BTreeMap::new();
296        for (_, s) in compose_services {
297            for (k, v) in s {
298                services.insert(k, v);
299            }
300        }
301        let compose_file = ComposeFile {
302            services,
303            volumes,
304            name: format!("risingwave-{}", opts.profile),
305        };
306
307        let yaml = serde_yaml::to_string(&compose_file)?;
308
309        fs_err::write(Path::new(&opts.directory).join("docker-compose.yml"), yaml)?;
310    }
311
312    Ok(())
313}