risedev/task/
pubsub_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::env;
16use std::path::{Path, PathBuf};
17use std::process::Command;
18
19use anyhow::{Result, anyhow};
20
21use super::{ExecuteContext, Task};
22use crate::PubsubConfig;
23use crate::util::stylized_risedev_subcmd;
24
25pub struct PubsubService {
26    config: PubsubConfig,
27}
28
29impl PubsubService {
30    pub fn new(config: PubsubConfig) -> Result<Self> {
31        Ok(Self { config })
32    }
33
34    fn gcloud_path(&self) -> Result<PathBuf> {
35        let prefix_bin = env::var("PREFIX_BIN")?;
36        Ok(Path::new(&prefix_bin)
37            .join("gcloud")
38            .join("start-pubsub-emulator.sh"))
39    }
40
41    fn gcloud(&self) -> Result<Command> {
42        Ok(Command::new(self.gcloud_path()?))
43    }
44}
45
46impl Task for PubsubService {
47    fn execute(&mut self, ctx: &mut ExecuteContext<impl std::io::Write>) -> anyhow::Result<()> {
48        ctx.service(self);
49        ctx.pb.set_message("starting...");
50
51        let path = self.gcloud_path()?;
52        if !path.exists() {
53            return Err(anyhow!(
54                "gcloud binary not found in {:?}\nDid you enable pubsub-emulator feature in `{}`?",
55                path,
56                stylized_risedev_subcmd("configure")
57            ));
58        }
59
60        let mut cmd = self.gcloud()?;
61        cmd.arg(format!("{}", self.config.port));
62
63        ctx.run_command(ctx.tmux_run(cmd)?)?;
64
65        ctx.pb.set_message("started");
66
67        Ok(())
68    }
69
70    fn id(&self) -> String {
71        self.config.id.clone()
72    }
73}