risedev/
compose_deploy.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::os::unix::prelude::PermissionsExt;
17use std::path::Path;
18
19use anyhow::Result;
20
21use crate::ComposeConfig;
22
23#[derive(serde::Serialize, serde::Deserialize)]
24#[serde(rename_all = "kebab-case")]
25#[serde(deny_unknown_fields)]
26pub struct Ec2Instance {
27    pub id: String,
28    pub dns_host: String,
29    pub public_ip: String,
30    pub r#type: String,
31}
32
33#[derive(serde::Serialize, serde::Deserialize)]
34#[serde(rename_all = "kebab-case")]
35#[serde(deny_unknown_fields)]
36pub struct ComposeDeployConfig {
37    pub instances: Vec<Ec2Instance>,
38    pub risingwave_image_override: Option<String>,
39    pub risedev_extra_args: HashMap<String, String>,
40}
41
42impl ComposeDeployConfig {
43    pub fn lookup_instance_by_id(&self, id: &str) -> &Ec2Instance {
44        self.instances.iter().find(|i| i.id == id).unwrap()
45    }
46
47    pub fn lookup_instance_by_host(&self, host: &str) -> &Ec2Instance {
48        self.instances.iter().find(|i| i.dns_host == host).unwrap()
49    }
50}
51
52pub fn compose_deploy(
53    output_directory: &Path,
54    steps: &[String],
55    ec2_instances: &[Ec2Instance],
56    compose_config: &ComposeConfig,
57    service_on_node: &BTreeMap<String, String>,
58) -> Result<()> {
59    let shell_script = {
60        use std::fmt::Write;
61        let ssh_extra_args = "-o \"UserKnownHostsFile=/dev/null\" -o \"StrictHostKeyChecking=no\" -o \"LogLevel=ERROR\"";
62        let mut x = String::new();
63        writeln!(x, "#!/usr/bin/env bash -e")?;
64        writeln!(x)?;
65        writeln!(
66            x,
67            r#"
68DIR="$( cd "$( dirname "${{BASH_SOURCE[0]}}" )" >/dev/null 2>&1 && pwd )"
69cd "$DIR""#
70        )?;
71
72        writeln!(x)?;
73
74        writeln!(
75            x,
76            r#"
77DO_ALL_STEPS=1
78
79while getopts '1234' OPT; do
80    case $OPT in
81        1)
82            DO_SYNC=1
83            DO_ALL_STEPS=0
84            ;;
85        2)
86            DO_TEAR_DOWN=1
87            DO_ALL_STEPS=0
88            ;;
89        3)
90            DO_START=1
91            DO_ALL_STEPS=0
92            ;;
93        4)
94            DO_CHECK=1
95            DO_ALL_STEPS=0
96        ;;
97    esac
98done
99
100if [[ "$DO_ALL_STEPS" -eq 1 ]]; then
101DO_SYNC=1
102DO_TEAR_DOWN=1
103DO_START=1
104DO_CHECK=1
105fi
106"#
107        )?;
108        writeln!(x, r#"if [[ "$DO_SYNC" -eq 1 ]]; then"#)?;
109        writeln!(x, "# --- Sync Config ---")?;
110        writeln!(x, r#"echo "$(tput setaf 2)(1/4) sync config$(tput sgr0)""#)?;
111        writeln!(
112            x,
113            "echo -e \"If this step takes too long time, maybe EC2 IP has been changed. You'll need to re-run:\\n* $(tput setaf 2)terraform apply$(tput sgr0) to get the latest IP,\\n* and then $(tput setaf 2)./risedev compose-deploy <profile>$(tput sgr0) again to update the deploy script.\""
114        )?;
115        writeln!(x, "parallel --linebuffer bash << EOF")?;
116        for instance in ec2_instances {
117            let host = &instance.dns_host;
118            let public_ip = &instance.public_ip;
119            let id = &instance.id;
120            let base_folder = "~/risingwave-deploy";
121            let mut y = String::new();
122            writeln!(y, "#!/usr/bin/env bash -e")?;
123            writeln!(y)?;
124            writeln!(
125                y,
126                r#"echo "{id}: $(tput setaf 2)start sync config$(tput sgr0)""#,
127            )?;
128            writeln!(
129                y,
130                "rsync -azh -e \"ssh {ssh_extra_args}\" ./ ubuntu@{public_ip}:{base_folder} --exclude 'deploy.sh' --exclude '*.partial.sh'",
131            )?;
132            writeln!(
133                y,
134                "scp {ssh_extra_args} ./{host}.yml ubuntu@{public_ip}:{base_folder}/docker-compose.yaml"
135            )?;
136            writeln!(
137                y,
138                r#"echo "{id}: $(tput setaf 2)done sync config$(tput sgr0)""#,
139            )?;
140            let sh = format!("_deploy.{id}.partial.sh");
141            fs_err::write(Path::new(output_directory).join(&sh), y)?;
142            writeln!(x, "{sh}")?;
143        }
144        writeln!(x, "EOF")?;
145        writeln!(x, r#"fi"#)?;
146        writeln!(x)?;
147        writeln!(x, r#"if [[ "$DO_TEAR_DOWN" -eq 1 ]]; then"#)?;
148        writeln!(x, "# --- Tear Down Services ---")?;
149        writeln!(
150            x,
151            r#"echo "$(tput setaf 2)(2/4) stop services and pull latest image$(tput sgr0)""#,
152        )?;
153        writeln!(x, "parallel --linebuffer bash << EOF")?;
154        for instance in ec2_instances {
155            let id = &instance.id;
156            let mut y = String::new();
157            writeln!(y, "#!/usr/bin/env bash -e")?;
158            writeln!(
159                y,
160                r#"echo "{id}: $(tput setaf 2)stopping and pulling$(tput sgr0)""#,
161            )?;
162            let public_ip = &instance.public_ip;
163            let base_folder = "~/risingwave-deploy";
164            let tear_down_volumes = instance.r#type == "source" || instance.r#type == "compute";
165            let down_extra_arg = if tear_down_volumes {
166                // tear down volumes for source and compute
167                " -v"
168            } else {
169                ""
170            };
171            writeln!(
172                y,
173                "ssh {ssh_extra_args} ubuntu@{public_ip} \"bash -c 'cd {base_folder} && docker compose kill && docker compose down --remove-orphans{down_extra_arg} && docker pull {}'\"",
174                compose_config.image.risingwave
175            )?;
176            if tear_down_volumes {
177                writeln!(
178                    y,
179                    r#"echo "{id}: $(tput setaf 2)done tear down (along with volumes)$(tput sgr0)""#
180                )?;
181            } else {
182                writeln!(
183                    y,
184                    r#"echo "{id}: $(tput setaf 2)done tear down$(tput sgr0)""#
185                )?;
186            }
187
188            let sh = format!("_stop.{id}.partial.sh");
189            fs_err::write(Path::new(output_directory).join(&sh), y)?;
190            writeln!(x, "{sh}")?;
191        }
192        writeln!(x, "EOF")?;
193        writeln!(x, r#"fi"#)?;
194        writeln!(x)?;
195        writeln!(x, r#"if [[ "$DO_START" -eq 1 ]]; then"#)?;
196        writeln!(x, "# --- Start Services ---")?;
197        writeln!(
198            x,
199            r#"echo "$(tput setaf 2)(3/4) start services$(tput sgr0)""#,
200        )?;
201        for step in steps {
202            let dns_host = if let Some(dns_host) = service_on_node.get(step) {
203                dns_host
204            } else {
205                // pseudo-services like s3, skip.
206                continue;
207            };
208            let instance = ec2_instances
209                .iter()
210                .find(|ec2| &ec2.dns_host == dns_host)
211                .unwrap();
212            let id = &instance.id;
213            writeln!(
214                x,
215                r#"echo "{id}: $(tput setaf 2)start service {step}$(tput sgr0)""#,
216            )?;
217            let public_ip = &instance.public_ip;
218            let base_folder = "~/risingwave-deploy";
219            writeln!(
220                x,
221                "ssh {ssh_extra_args} ubuntu@{public_ip} \"bash -c 'cd {base_folder} && docker compose up -d {step}'\""
222            )?;
223        }
224        writeln!(x, r#"fi"#)?;
225        writeln!(x)?;
226        writeln!(x, r#"if [[ "$DO_CHECK" -eq 1 ]]; then"#)?;
227        writeln!(x, "# --- Check Services ---")?;
228        writeln!(
229            x,
230            r#"echo "$(tput setaf 2)(4/4) check service started$(tput sgr0)""#
231        )?;
232        writeln!(x, "parallel --linebuffer bash << EOF")?;
233        for instance in ec2_instances {
234            let id = &instance.id;
235            let mut y = String::new();
236            writeln!(y, "#!/usr/bin/env bash -e")?;
237            writeln!(y, r#"echo "{id}: $(tput setaf 2)check status$(tput sgr0)""#,)?;
238            let public_ip = &instance.public_ip;
239            let base_folder = "~/risingwave-deploy";
240            writeln!(
241                y,
242                "ssh {ssh_extra_args} ubuntu@{public_ip} \"bash -c 'cd {base_folder} && docker compose ps'\""
243            )?;
244
245            let sh = format!("_check.{id}.partial.sh");
246            fs_err::write(Path::new(output_directory).join(&sh), y)?;
247            writeln!(x, "{sh}")?;
248        }
249        writeln!(x, "EOF")?;
250        writeln!(x, r#"fi"#)?;
251        x
252    };
253    let deploy_sh = Path::new(output_directory).join("deploy.sh");
254    fs_err::write(&deploy_sh, shell_script)?;
255    let mut perms = fs_err::metadata(&deploy_sh)?.permissions();
256    perms.set_mode(perms.mode() | 0o755);
257    fs_err::set_permissions(&deploy_sh, perms)?;
258    Ok(())
259}