risedev/task/
task_kafka_ready_check.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use anyhow::{Context, Result};
18use rdkafka::ClientConfig;
19use rdkafka::config::FromClientConfig;
20use rdkafka::consumer::{BaseConsumer, Consumer};
21
22use crate::{ExecuteContext, KafkaConfig, Task};
23
24pub struct KafkaReadyCheckTask {
25    config: KafkaConfig,
26}
27
28impl KafkaReadyCheckTask {
29    pub fn new(config: KafkaConfig) -> Result<Self> {
30        Ok(Self { config })
31    }
32}
33
34impl Task for KafkaReadyCheckTask {
35    fn execute(&mut self, ctx: &mut ExecuteContext<impl std::io::Write>) -> anyhow::Result<()> {
36        if self.config.user_managed {
37            ctx.pb
38                .set_message("waiting for user-managed service online...");
39        } else {
40            ctx.pb.set_message("waiting for online...");
41        }
42        let mut config = ClientConfig::new();
43        config.set(
44            "bootstrap.servers",
45            format!("{}:{}", self.config.address, self.config.port),
46        );
47
48        let rt = tokio::runtime::Builder::new_current_thread()
49            .enable_time()
50            .enable_io()
51            .build()?;
52        let consumer = rt.block_on(async {
53            BaseConsumer::from_config(&config)
54                .await
55                .context("failed to create consumer")
56        })?;
57
58        ctx.wait(|| {
59            rt.block_on(async {
60                let _metadata = consumer
61                    .fetch_metadata(None, Duration::from_secs(1))
62                    .await
63                    .context("failed to fetch metadata")?;
64                Ok(())
65            })
66        })?;
67
68        ctx.complete_spin();
69
70        Ok(())
71    }
72}