risedev/task/
task_kafka_ready_check.rs1use std::time::Duration;
16
17use anyhow::{Context, Result};
18use rdkafka::ClientConfig;
19use rdkafka::config::FromClientConfig;
20use rdkafka::consumer::{BaseConsumer, Consumer};
21
22use crate::{ExecuteContext, KafkaConfig, Task};
23
24pub struct KafkaReadyCheckTask {
25 config: KafkaConfig,
26}
27
28impl KafkaReadyCheckTask {
29 pub fn new(config: KafkaConfig) -> Result<Self> {
30 Ok(Self { config })
31 }
32}
33
34impl Task for KafkaReadyCheckTask {
35 fn execute(&mut self, ctx: &mut ExecuteContext<impl std::io::Write>) -> anyhow::Result<()> {
36 if self.config.user_managed {
37 ctx.pb
38 .set_message("waiting for user-managed service online...");
39 } else {
40 ctx.pb.set_message("waiting for online...");
41 }
42 let mut config = ClientConfig::new();
43 config.set(
44 "bootstrap.servers",
45 format!("{}:{}", self.config.address, self.config.port),
46 );
47
48 let rt = tokio::runtime::Builder::new_current_thread()
49 .enable_time()
50 .enable_io()
51 .build()?;
52 let consumer = rt.block_on(async {
53 BaseConsumer::from_config(&config)
54 .await
55 .context("failed to create consumer")
56 })?;
57
58 ctx.wait(|| {
59 rt.block_on(async {
60 let _metadata = consumer
61 .fetch_metadata(None, Duration::from_secs(1))
62 .await
63 .context("failed to fetch metadata")?;
64 Ok(())
65 })
66 })?;
67
68 ctx.complete_spin();
69
70 Ok(())
71 }
72}