risedev/task/
task_pubsub_emu_ready_check.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::thread;
16use std::time::Duration;
17
18use anyhow::{Result, anyhow};
19use google_cloud_pubsub::client::Client;
20
21use crate::{ExecuteContext, PubsubConfig, Task};
22
23pub struct PubsubReadyTaskCheck {
24    config: PubsubConfig,
25}
26
27impl PubsubReadyTaskCheck {
28    pub fn new(config: PubsubConfig) -> Result<Self> {
29        Ok(Self { config })
30    }
31}
32
33// #[async_tr]
34impl Task for PubsubReadyTaskCheck {
35    fn execute(&mut self, ctx: &mut ExecuteContext<impl std::io::Write>) -> anyhow::Result<()> {
36        ctx.pb.set_message("waiting for online...");
37
38        // environment variables to use the pubsub emulator
39        // SAFETY: RiseDev is for development purposes only.
40        unsafe {
41            std::env::set_var(
42                "PUBSUB_EMULATOR_HOST",
43                format!("{}:{}", self.config.address, self.config.port),
44            );
45        }
46
47        thread::sleep(Duration::from_secs(5));
48        let async_runtime = tokio::runtime::Builder::new_current_thread()
49            .enable_time()
50            .enable_io()
51            .build()?;
52
53        let client = async_runtime.block_on(Client::new(Default::default()))?;
54
55        ctx.wait(|| {
56            async_runtime
57                .block_on(client.get_subscriptions(None))
58                .map_err(|e| anyhow!(e))?;
59            Ok(())
60        })?;
61
62        ctx.complete_spin();
63
64        Ok(())
65    }
66}