risedev/task/
task_pubsub_emu_ready_check.rs1use std::thread;
16use std::time::Duration;
17
18use anyhow::{Result, anyhow};
19use google_cloud_pubsub::client::Client;
20
21use crate::{ExecuteContext, PubsubConfig, Task};
22
23pub struct PubsubReadyTaskCheck {
24    config: PubsubConfig,
25}
26
27impl PubsubReadyTaskCheck {
28    pub fn new(config: PubsubConfig) -> Result<Self> {
29        Ok(Self { config })
30    }
31}
32
33impl Task for PubsubReadyTaskCheck {
35    fn execute(&mut self, ctx: &mut ExecuteContext<impl std::io::Write>) -> anyhow::Result<()> {
36        ctx.pb.set_message("waiting for online...");
37
38        unsafe {
41            std::env::set_var(
42                "PUBSUB_EMULATOR_HOST",
43                format!("{}:{}", self.config.address, self.config.port),
44            );
45        }
46
47        thread::sleep(Duration::from_secs(5));
48        let async_runtime = tokio::runtime::Builder::new_current_thread()
49            .enable_time()
50            .enable_io()
51            .build()?;
52
53        let client = async_runtime.block_on(Client::new(Default::default()))?;
54
55        ctx.wait(|| {
56            async_runtime
57                .block_on(client.get_subscriptions(None))
58                .map_err(|e| anyhow!(e))?;
59            Ok(())
60        })?;
61
62        ctx.complete_spin();
63
64        Ok(())
65    }
66}