1use std::collections::BTreeMap;
16use std::io::Read;
17use std::path::Path;
18
19use anyhow::{Result, anyhow};
20use clap::Parser;
21use console::style;
22use fs_err::File;
23use itertools::Itertools;
24use risedev::{
25 Compose, ComposeConfig, ComposeDeployConfig, ComposeFile, ComposeService, ComposeVolume,
26 ConfigExpander, DockerImageConfig, RISEDEV_CONFIG_FILE, ServiceConfig, compose_deploy,
27 generate_risedev_env,
28};
29use serde::Deserialize;
30
31#[derive(Parser)]
32#[clap(author, version, about, long_about = None)]
33#[clap(propagate_version = true)]
34pub struct RiseDevComposeOpts {
35 #[clap(short, long)]
36 directory: String,
37
38 #[clap(default_value = "compose")]
39 profile: String,
40
41 #[clap(long)]
44 deploy: bool,
45}
46
47fn load_docker_image_config(
48 risedev_config: &str,
49 override_risingwave_image: Option<&String>,
50) -> Result<DockerImageConfig> {
51 #[derive(Deserialize)]
52 struct ConfigInRiseDev {
53 compose: DockerImageConfig,
54 }
55 let mut config: ConfigInRiseDev = serde_yaml::from_str(risedev_config)?;
56 if let Some(override_risingwave_image) = override_risingwave_image {
57 config.compose.risingwave = override_risingwave_image.to_string();
58 }
59 Ok(config.compose)
60}
61
62fn main() -> Result<()> {
63 let opts = RiseDevComposeOpts::parse();
64
65 let (risedev_config, compose_deploy_config, rw_config_path) = if opts.deploy {
66 let compose_deploy_config = {
67 let mut content = String::new();
68 File::open("risedev-compose.yml")?.read_to_string(&mut content)?;
69 content
70 };
71 let compose_deploy_config: ComposeDeployConfig =
72 serde_yaml::from_str(&compose_deploy_config)?;
73 let extra_info = compose_deploy_config
74 .instances
75 .iter()
76 .map(|i| (format!("dns-host:{}", i.id), i.dns_host.clone()))
77 .chain(
78 compose_deploy_config
79 .risedev_extra_args
80 .iter()
81 .map(|(k, v)| (k.clone(), v.clone())),
82 )
83 .collect();
84
85 let (config_path, _env, expanded_config) =
86 ConfigExpander::expand_with_extra_info(".", &opts.profile, extra_info)?;
87 (expanded_config, Some(compose_deploy_config), config_path)
88 } else {
89 let (config_path, _env, expanded_config) = ConfigExpander::expand(".", &opts.profile)?;
90 (expanded_config, None, config_path)
91 };
92
93 let compose_config = ComposeConfig {
94 image: load_docker_image_config(
95 &fs_err::read_to_string(RISEDEV_CONFIG_FILE)?,
96 compose_deploy_config
97 .as_ref()
98 .and_then(|x| x.risingwave_image_override.as_ref()),
99 )?,
100 config_directory: opts.directory.clone(),
101 rw_config_path,
102 };
103
104 let services = ConfigExpander::deserialize(&risedev_config)?;
105
106 let mut compose_services: BTreeMap<String, BTreeMap<String, ComposeService>> = BTreeMap::new();
107 let mut service_on_node: BTreeMap<String, String> = BTreeMap::new();
108 let mut volumes = BTreeMap::new();
109
110 let mut log_buffer = String::new();
111 use std::fmt::Write;
112
113 for service in &services {
114 let step = service.id();
115
116 let compose_deploy_config = compose_deploy_config.as_ref();
117 let (address, mut compose) = match service {
118 ServiceConfig::Minio(c) => {
119 volumes.insert(c.id.clone(), ComposeVolume::default());
120 (c.address.clone(), c.compose(&compose_config)?)
121 }
122 ServiceConfig::Sqlite(_) => continue,
123 ServiceConfig::Prometheus(c) => {
124 volumes.insert(c.id.clone(), ComposeVolume::default());
125 (c.address.clone(), c.compose(&compose_config)?)
126 }
127 ServiceConfig::ComputeNode(c) => {
128 volumes.insert(c.id.clone(), ComposeVolume::default());
129 (c.address.clone(), c.compose(&compose_config)?)
130 }
131 ServiceConfig::MetaNode(c) => {
132 if opts.deploy {
133 let public_ip = &compose_deploy_config
134 .unwrap()
135 .lookup_instance_by_host(&c.address)
136 .public_ip;
137 writeln!(
138 log_buffer,
139 "-- Dashboard --\nuse VSCode to forward {} from {}\nor use {}\n",
140 style(format!("{}", c.dashboard_port)).green(),
141 style(format!("ubuntu@{}", public_ip)).green(),
142 style(format!(
143 "ssh -N -L {}:localhost:{} ubuntu@{}",
144 c.dashboard_port, c.dashboard_port, public_ip
145 ))
146 .green()
147 )?;
148 }
149 (c.address.clone(), c.compose(&compose_config)?)
150 }
151 ServiceConfig::Frontend(c) => {
152 if opts.deploy {
153 let arg = format!("--frontend {} --frontend-port {}", c.address, c.port);
154 writeln!(
155 log_buffer,
156 "-- Frontend --\nAccess inside cluster: {}\ntpch-bench args: {}\n",
157 style(format!(
158 "psql -d dev -h {} -p {} -U root",
159 c.address, c.port
160 ))
161 .green(),
162 style(&arg).green()
163 )?;
164 fs_err::write(
165 Path::new(&opts.directory).join("tpch-bench-args-frontend"),
166 arg,
167 )?;
168 }
169 (c.address.clone(), c.compose(&compose_config)?)
170 }
171 ServiceConfig::Compactor(c) => (c.address.clone(), c.compose(&compose_config)?),
172 ServiceConfig::Grafana(c) => {
173 if opts.deploy {
174 let public_ip = &compose_deploy_config
175 .unwrap()
176 .lookup_instance_by_host(&c.address)
177 .public_ip;
178 writeln!(
179 log_buffer,
180 "-- Grafana --\nuse VSCode to forward {} from {}\nor use {}\n",
181 style(format!("{}", c.port)).green(),
182 style(format!("ubuntu@{}", public_ip)).green(),
183 style(format!(
184 "ssh -N -L {}:localhost:{} ubuntu@{}",
185 c.port, c.port, public_ip
186 ))
187 .green()
188 )?;
189 }
190 volumes.insert(c.id.clone(), ComposeVolume::default());
191 (c.address.clone(), c.compose(&compose_config)?)
192 }
193 ServiceConfig::Tempo(c) => (c.address.clone(), c.compose(&compose_config)?),
194 ServiceConfig::Kafka(_) => {
195 return Err(anyhow!("not supported, please use redpanda instead"));
196 }
197 ServiceConfig::Pubsub(_) => {
198 return Err(anyhow!("not supported, please use redpanda instead"));
199 }
200 ServiceConfig::Opendal(_) => continue,
201 ServiceConfig::AwsS3(_) => continue,
202 ServiceConfig::RedPanda(c) => {
203 if opts.deploy {
204 let arg = format!("--kafka-addr {}:{}", c.address, c.internal_port);
205 writeln!(
206 log_buffer,
207 "-- Redpanda --\ntpch-bench: {}\n",
208 style(&arg).green()
209 )?;
210 fs_err::write(
211 Path::new(&opts.directory).join("tpch-bench-args-kafka"),
212 arg,
213 )?;
214 }
215 volumes.insert(c.id.clone(), ComposeVolume::default());
216 (c.address.clone(), c.compose(&compose_config)?)
217 }
218 ServiceConfig::Redis(_)
219 | ServiceConfig::MySql(_)
220 | ServiceConfig::Postgres(_)
221 | ServiceConfig::SqlServer(_)
222 | ServiceConfig::SchemaRegistry(_) => return Err(anyhow!("not supported")),
223 };
224 compose.container_name = service.id().to_owned();
225 if opts.deploy {
226 compose.network_mode = Some("host".into());
227 compose.depends_on = vec![];
228 }
229 compose_services
230 .entry(address.clone())
231 .or_default()
232 .insert(step.to_owned(), compose);
233 service_on_node.insert(step.to_owned(), address);
234 }
235
236 if opts.deploy {
237 for (node, services) in &compose_services {
238 let mut node_volumes = BTreeMap::new();
239 services.keys().for_each(|k| {
240 if let Some(v) = volumes.get(k) {
241 node_volumes.insert(k.clone(), v.clone());
242 }
243 });
244 let compose_file = ComposeFile {
245 services: services.clone(),
246 volumes: node_volumes,
247 name: format!("risingwave-{}", opts.profile),
248 };
249
250 let yaml = serde_yaml::to_string(&compose_file)?;
251
252 let ec2_instance = compose_deploy_config
253 .as_ref()
254 .unwrap()
255 .lookup_instance_by_host(node);
256 if ec2_instance.r#type == "meta" {
257 let public_ip = &ec2_instance.public_ip;
258 writeln!(
259 log_buffer,
260 "-- Meta Node --\nLogin to meta node by {}\nor using VSCode {}\n",
261 style(format!("ssh ubuntu@{}", public_ip)).green(),
262 style(format!(
263 "code --remote ssh-remote+ubuntu@{} <path>",
264 public_ip
265 ))
266 .green()
267 )?;
268 }
269
270 fs_err::write(
271 Path::new(&opts.directory).join(format!("{}.yml", node)),
272 yaml,
273 )?;
274 }
275
276 if let Some(env) = Some(generate_risedev_env(&services)).filter(|x| !x.is_empty()) {
277 writeln!(log_buffer, "-- risedev-env --\n{}\n", style(env).green())?;
278 }
279
280 compose_deploy(
281 Path::new(&opts.directory),
282 &services.iter().map(|s| s.id().to_owned()).collect_vec(),
283 &compose_deploy_config.as_ref().unwrap().instances,
284 &compose_config,
285 &service_on_node,
286 )?;
287
288 println!("\n{}", log_buffer);
289
290 fs_err::write(
291 Path::new(&opts.directory).join("_message.partial.sh"),
292 log_buffer,
293 )?;
294 } else {
295 let mut services = BTreeMap::new();
296 for (_, s) in compose_services {
297 for (k, v) in s {
298 services.insert(k, v);
299 }
300 }
301 let compose_file = ComposeFile {
302 services,
303 volumes,
304 name: format!("risingwave-{}", opts.profile),
305 };
306
307 let yaml = serde_yaml::to_string(&compose_file)?;
308
309 fs_err::write(Path::new(&opts.directory).join("docker-compose.yml"), yaml)?;
310 }
311
312 Ok(())
313}