risedev/
compose_deploy.rs1use std::collections::{BTreeMap, HashMap};
16use std::os::unix::prelude::PermissionsExt;
17use std::path::Path;
18
19use anyhow::Result;
20
21use crate::ComposeConfig;
22
23#[derive(serde::Serialize, serde::Deserialize)]
24#[serde(rename_all = "kebab-case")]
25#[serde(deny_unknown_fields)]
26pub struct Ec2Instance {
27 pub id: String,
28 pub dns_host: String,
29 pub public_ip: String,
30 pub r#type: String,
31}
32
33#[derive(serde::Serialize, serde::Deserialize)]
34#[serde(rename_all = "kebab-case")]
35#[serde(deny_unknown_fields)]
36pub struct ComposeDeployConfig {
37 pub instances: Vec<Ec2Instance>,
38 pub risingwave_image_override: Option<String>,
39 pub risedev_extra_args: HashMap<String, String>,
40}
41
42impl ComposeDeployConfig {
43 pub fn lookup_instance_by_id(&self, id: &str) -> &Ec2Instance {
44 self.instances.iter().find(|i| i.id == id).unwrap()
45 }
46
47 pub fn lookup_instance_by_host(&self, host: &str) -> &Ec2Instance {
48 self.instances.iter().find(|i| i.dns_host == host).unwrap()
49 }
50}
51
52pub fn compose_deploy(
53 output_directory: &Path,
54 steps: &[String],
55 ec2_instances: &[Ec2Instance],
56 compose_config: &ComposeConfig,
57 service_on_node: &BTreeMap<String, String>,
58) -> Result<()> {
59 let shell_script = {
60 use std::fmt::Write;
61 let ssh_extra_args = "-o \"UserKnownHostsFile=/dev/null\" -o \"StrictHostKeyChecking=no\" -o \"LogLevel=ERROR\"";
62 let mut x = String::new();
63 writeln!(x, "#!/usr/bin/env bash -e")?;
64 writeln!(x)?;
65 writeln!(
66 x,
67 r#"
68DIR="$( cd "$( dirname "${{BASH_SOURCE[0]}}" )" >/dev/null 2>&1 && pwd )"
69cd "$DIR""#
70 )?;
71
72 writeln!(x)?;
73
74 writeln!(
75 x,
76 r#"
77DO_ALL_STEPS=1
78
79while getopts '1234' OPT; do
80 case $OPT in
81 1)
82 DO_SYNC=1
83 DO_ALL_STEPS=0
84 ;;
85 2)
86 DO_TEAR_DOWN=1
87 DO_ALL_STEPS=0
88 ;;
89 3)
90 DO_START=1
91 DO_ALL_STEPS=0
92 ;;
93 4)
94 DO_CHECK=1
95 DO_ALL_STEPS=0
96 ;;
97 esac
98done
99
100if [[ "$DO_ALL_STEPS" -eq 1 ]]; then
101DO_SYNC=1
102DO_TEAR_DOWN=1
103DO_START=1
104DO_CHECK=1
105fi
106"#
107 )?;
108 writeln!(x, r#"if [[ "$DO_SYNC" -eq 1 ]]; then"#)?;
109 writeln!(x, "# --- Sync Config ---")?;
110 writeln!(x, r#"echo "$(tput setaf 2)(1/4) sync config$(tput sgr0)""#)?;
111 writeln!(
112 x,
113 "echo -e \"If this step takes too long time, maybe EC2 IP has been changed. You'll need to re-run:\\n* $(tput setaf 2)terraform apply$(tput sgr0) to get the latest IP,\\n* and then $(tput setaf 2)./risedev compose-deploy <profile>$(tput sgr0) again to update the deploy script.\""
114 )?;
115 writeln!(x, "parallel --linebuffer bash << EOF")?;
116 for instance in ec2_instances {
117 let host = &instance.dns_host;
118 let public_ip = &instance.public_ip;
119 let id = &instance.id;
120 let base_folder = "~/risingwave-deploy";
121 let mut y = String::new();
122 writeln!(y, "#!/usr/bin/env bash -e")?;
123 writeln!(y)?;
124 writeln!(
125 y,
126 r#"echo "{id}: $(tput setaf 2)start sync config$(tput sgr0)""#,
127 )?;
128 writeln!(
129 y,
130 "rsync -azh -e \"ssh {ssh_extra_args}\" ./ ubuntu@{public_ip}:{base_folder} --exclude 'deploy.sh' --exclude '*.partial.sh'",
131 )?;
132 writeln!(
133 y,
134 "scp {ssh_extra_args} ./{host}.yml ubuntu@{public_ip}:{base_folder}/docker-compose.yaml"
135 )?;
136 writeln!(
137 y,
138 r#"echo "{id}: $(tput setaf 2)done sync config$(tput sgr0)""#,
139 )?;
140 let sh = format!("_deploy.{id}.partial.sh");
141 fs_err::write(Path::new(output_directory).join(&sh), y)?;
142 writeln!(x, "{sh}")?;
143 }
144 writeln!(x, "EOF")?;
145 writeln!(x, r#"fi"#)?;
146 writeln!(x)?;
147 writeln!(x, r#"if [[ "$DO_TEAR_DOWN" -eq 1 ]]; then"#)?;
148 writeln!(x, "# --- Tear Down Services ---")?;
149 writeln!(
150 x,
151 r#"echo "$(tput setaf 2)(2/4) stop services and pull latest image$(tput sgr0)""#,
152 )?;
153 writeln!(x, "parallel --linebuffer bash << EOF")?;
154 for instance in ec2_instances {
155 let id = &instance.id;
156 let mut y = String::new();
157 writeln!(y, "#!/usr/bin/env bash -e")?;
158 writeln!(
159 y,
160 r#"echo "{id}: $(tput setaf 2)stopping and pulling$(tput sgr0)""#,
161 )?;
162 let public_ip = &instance.public_ip;
163 let base_folder = "~/risingwave-deploy";
164 let tear_down_volumes = instance.r#type == "source" || instance.r#type == "compute";
165 let down_extra_arg = if tear_down_volumes {
166 " -v"
168 } else {
169 ""
170 };
171 writeln!(
172 y,
173 "ssh {ssh_extra_args} ubuntu@{public_ip} \"bash -c 'cd {base_folder} && docker compose kill && docker compose down --remove-orphans{down_extra_arg} && docker pull {}'\"",
174 compose_config.image.risingwave
175 )?;
176 if tear_down_volumes {
177 writeln!(
178 y,
179 r#"echo "{id}: $(tput setaf 2)done tear down (along with volumes)$(tput sgr0)""#
180 )?;
181 } else {
182 writeln!(
183 y,
184 r#"echo "{id}: $(tput setaf 2)done tear down$(tput sgr0)""#
185 )?;
186 }
187
188 let sh = format!("_stop.{id}.partial.sh");
189 fs_err::write(Path::new(output_directory).join(&sh), y)?;
190 writeln!(x, "{sh}")?;
191 }
192 writeln!(x, "EOF")?;
193 writeln!(x, r#"fi"#)?;
194 writeln!(x)?;
195 writeln!(x, r#"if [[ "$DO_START" -eq 1 ]]; then"#)?;
196 writeln!(x, "# --- Start Services ---")?;
197 writeln!(
198 x,
199 r#"echo "$(tput setaf 2)(3/4) start services$(tput sgr0)""#,
200 )?;
201 for step in steps {
202 let dns_host = if let Some(dns_host) = service_on_node.get(step) {
203 dns_host
204 } else {
205 continue;
207 };
208 let instance = ec2_instances
209 .iter()
210 .find(|ec2| &ec2.dns_host == dns_host)
211 .unwrap();
212 let id = &instance.id;
213 writeln!(
214 x,
215 r#"echo "{id}: $(tput setaf 2)start service {step}$(tput sgr0)""#,
216 )?;
217 let public_ip = &instance.public_ip;
218 let base_folder = "~/risingwave-deploy";
219 writeln!(
220 x,
221 "ssh {ssh_extra_args} ubuntu@{public_ip} \"bash -c 'cd {base_folder} && docker compose up -d {step}'\""
222 )?;
223 }
224 writeln!(x, r#"fi"#)?;
225 writeln!(x)?;
226 writeln!(x, r#"if [[ "$DO_CHECK" -eq 1 ]]; then"#)?;
227 writeln!(x, "# --- Check Services ---")?;
228 writeln!(
229 x,
230 r#"echo "$(tput setaf 2)(4/4) check service started$(tput sgr0)""#
231 )?;
232 writeln!(x, "parallel --linebuffer bash << EOF")?;
233 for instance in ec2_instances {
234 let id = &instance.id;
235 let mut y = String::new();
236 writeln!(y, "#!/usr/bin/env bash -e")?;
237 writeln!(y, r#"echo "{id}: $(tput setaf 2)check status$(tput sgr0)""#,)?;
238 let public_ip = &instance.public_ip;
239 let base_folder = "~/risingwave-deploy";
240 writeln!(
241 y,
242 "ssh {ssh_extra_args} ubuntu@{public_ip} \"bash -c 'cd {base_folder} && docker compose ps'\""
243 )?;
244
245 let sh = format!("_check.{id}.partial.sh");
246 fs_err::write(Path::new(output_directory).join(&sh), y)?;
247 writeln!(x, "{sh}")?;
248 }
249 writeln!(x, "EOF")?;
250 writeln!(x, r#"fi"#)?;
251 x
252 };
253 let deploy_sh = Path::new(output_directory).join("deploy.sh");
254 fs_err::write(&deploy_sh, shell_script)?;
255 let mut perms = fs_err::metadata(&deploy_sh)?.permissions();
256 perms.set_mode(perms.mode() | 0o755);
257 fs_err::set_permissions(&deploy_sh, perms)?;
258 Ok(())
259}