risingwave_connector/source/kafka/
enumerator.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::{Arc, LazyLock, Weak};
17use std::time::Duration;
18
19use anyhow::{Context, anyhow};
20use async_trait::async_trait;
21use moka::future::Cache as MokaCache;
22use moka::ops::compute::Op;
23use rdkafka::admin::{AdminClient, AdminOptions};
24use rdkafka::consumer::{BaseConsumer, Consumer};
25use rdkafka::error::KafkaResult;
26use rdkafka::{ClientConfig, Offset, TopicPartitionList};
27use risingwave_common::bail;
28use risingwave_common::metrics::LabelGuardedIntGauge;
29
30use crate::error::{ConnectorError, ConnectorResult};
31use crate::source::SourceEnumeratorContextRef;
32use crate::source::base::SplitEnumerator;
33use crate::source::kafka::split::KafkaSplit;
34use crate::source::kafka::{
35    KAFKA_ISOLATION_LEVEL, KafkaConnectionProps, KafkaContextCommon, KafkaProperties,
36    RwConsumerContext,
37};
38
39type KafkaConsumer = BaseConsumer<RwConsumerContext>;
40type KafkaAdmin = AdminClient<RwConsumerContext>;
41
42/// Consumer client is shared, and the cache doesn't manage the lifecycle, so we store `Weak` and no eviction.
43pub static SHARED_KAFKA_CONSUMER: LazyLock<MokaCache<KafkaConnectionProps, Weak<KafkaConsumer>>> =
44    LazyLock::new(|| moka::future::Cache::builder().build());
45/// Admin client is short-lived, so we store `Arc` and sets a time-to-idle eviction policy.
46pub static SHARED_KAFKA_ADMIN: LazyLock<MokaCache<KafkaConnectionProps, Arc<KafkaAdmin>>> =
47    LazyLock::new(|| {
48        moka::future::Cache::builder()
49            .time_to_idle(Duration::from_secs(5 * 60))
50            .build()
51    });
52
53#[derive(Debug, Copy, Clone, Eq, PartialEq)]
54pub enum KafkaEnumeratorOffset {
55    Earliest,
56    Latest,
57    Timestamp(i64),
58    None,
59}
60
61pub struct KafkaSplitEnumerator {
62    context: SourceEnumeratorContextRef,
63    broker_address: String,
64    topic: String,
65    client: Arc<KafkaConsumer>,
66    start_offset: KafkaEnumeratorOffset,
67
68    // maybe used in the future for batch processing
69    stop_offset: KafkaEnumeratorOffset,
70
71    sync_call_timeout: Duration,
72    high_watermark_metrics: HashMap<i32, LabelGuardedIntGauge<2>>,
73
74    properties: KafkaProperties,
75    config: rdkafka::ClientConfig,
76}
77
78impl KafkaSplitEnumerator {
79    async fn drop_consumer_groups(&self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
80        let admin = SHARED_KAFKA_ADMIN
81            .try_get_with_by_ref(&self.properties.connection, async {
82                tracing::info!("build new kafka admin for {}", self.broker_address);
83                Ok(Arc::new(
84                    build_kafka_admin(&self.config, &self.properties).await?,
85                ))
86            })
87            .await?;
88
89        let group_ids = fragment_ids
90            .iter()
91            .map(|fragment_id| self.properties.group_id(*fragment_id))
92            .collect::<Vec<_>>();
93        let group_ids: Vec<&str> = group_ids.iter().map(|s| s.as_str()).collect();
94        let res = admin
95            .delete_groups(&group_ids, &AdminOptions::default())
96            .await?;
97        tracing::debug!(
98            topic = self.topic,
99            ?fragment_ids,
100            "delete groups result: {res:?}"
101        );
102        Ok(())
103    }
104}
105
106#[async_trait]
107impl SplitEnumerator for KafkaSplitEnumerator {
108    type Properties = KafkaProperties;
109    type Split = KafkaSplit;
110
111    async fn new(
112        properties: KafkaProperties,
113        context: SourceEnumeratorContextRef,
114    ) -> ConnectorResult<KafkaSplitEnumerator> {
115        let mut config = rdkafka::ClientConfig::new();
116        let common_props = &properties.common;
117
118        let broker_address = properties.connection.brokers.clone();
119        let topic = common_props.topic.clone();
120        config.set("bootstrap.servers", &broker_address);
121        config.set("isolation.level", KAFKA_ISOLATION_LEVEL);
122        properties.connection.set_security_properties(&mut config);
123        properties.set_client(&mut config);
124        let mut scan_start_offset = match properties
125            .scan_startup_mode
126            .as_ref()
127            .map(|s| s.to_lowercase())
128            .as_deref()
129        {
130            Some("earliest") => KafkaEnumeratorOffset::Earliest,
131            Some("latest") => KafkaEnumeratorOffset::Latest,
132            None => KafkaEnumeratorOffset::Earliest,
133            _ => bail!(
134                "properties `scan_startup_mode` only supports earliest and latest or leaving it empty"
135            ),
136        };
137
138        if let Some(s) = &properties.time_offset {
139            let time_offset = s.parse::<i64>().map_err(|e| anyhow!(e))?;
140            scan_start_offset = KafkaEnumeratorOffset::Timestamp(time_offset)
141        }
142
143        let mut client: Option<Arc<KafkaConsumer>> = None;
144        SHARED_KAFKA_CONSUMER
145            .entry_by_ref(&properties.connection)
146            .and_try_compute_with::<_, _, ConnectorError>(|maybe_entry| async {
147                if let Some(entry) = maybe_entry {
148                    let entry_value = entry.into_value();
149                    if let Some(client_) = entry_value.upgrade() {
150                        // return if the client is already built
151                        tracing::info!("reuse existing kafka client for {}", broker_address);
152                        client = Some(client_);
153                        return Ok(Op::Nop);
154                    }
155                }
156                tracing::info!("build new kafka client for {}", broker_address);
157                client = Some(build_kafka_client(&config, &properties).await?);
158                Ok(Op::Put(Arc::downgrade(client.as_ref().unwrap())))
159            })
160            .await?;
161
162        Ok(Self {
163            context,
164            broker_address,
165            topic,
166            client: client.unwrap(),
167            start_offset: scan_start_offset,
168            stop_offset: KafkaEnumeratorOffset::None,
169            sync_call_timeout: properties.common.sync_call_timeout,
170            high_watermark_metrics: HashMap::new(),
171            properties,
172            config,
173        })
174    }
175
176    async fn list_splits(&mut self) -> ConnectorResult<Vec<KafkaSplit>> {
177        let topic_partitions = self.fetch_topic_partition().await.with_context(|| {
178            format!(
179                "failed to fetch metadata from kafka ({})",
180                self.broker_address
181            )
182        })?;
183
184        let watermarks = self.get_watermarks(topic_partitions.as_ref()).await?;
185        let mut start_offsets = self
186            .fetch_start_offset(topic_partitions.as_ref(), &watermarks)
187            .await?;
188
189        let mut stop_offsets = self
190            .fetch_stop_offset(topic_partitions.as_ref(), &watermarks)
191            .await?;
192
193        let ret: Vec<_> = topic_partitions
194            .into_iter()
195            .map(|partition| KafkaSplit {
196                topic: self.topic.clone(),
197                partition,
198                start_offset: start_offsets.remove(&partition).unwrap(),
199                stop_offset: stop_offsets.remove(&partition).unwrap(),
200            })
201            .collect();
202
203        Ok(ret)
204    }
205
206    async fn on_drop_fragments(&mut self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
207        self.drop_consumer_groups(fragment_ids).await
208    }
209
210    async fn on_finish_backfill(&mut self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
211        self.drop_consumer_groups(fragment_ids).await
212    }
213}
214
215async fn build_kafka_client(
216    config: &ClientConfig,
217    properties: &KafkaProperties,
218) -> ConnectorResult<Arc<KafkaConsumer>> {
219    let ctx_common = KafkaContextCommon::new(
220        properties.privatelink_common.broker_rewrite_map.clone(),
221        None,
222        None,
223        properties.aws_auth_props.clone(),
224        properties.connection.is_aws_msk_iam(),
225    )
226    .await?;
227    let client_ctx = RwConsumerContext::new(ctx_common);
228    let client: KafkaConsumer = config.create_with_context(client_ctx).await?;
229
230    // Note that before any SASL/OAUTHBEARER broker connection can succeed the application must call
231    // rd_kafka_oauthbearer_set_token() once – either directly or, more typically, by invoking either
232    // rd_kafka_poll(), rd_kafka_consumer_poll(), rd_kafka_queue_poll(), etc, in order to cause retrieval
233    // of an initial token to occur.
234    // https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a988395722598f63396d7a1bedb22adaf
235    if properties.connection.is_aws_msk_iam() {
236        #[cfg(not(madsim))]
237        client.poll(Duration::from_secs(10)); // note: this is a blocking call
238        #[cfg(madsim)]
239        client.poll(Duration::from_secs(10)).await;
240    }
241    Ok(Arc::new(client))
242}
243async fn build_kafka_admin(
244    config: &ClientConfig,
245    properties: &KafkaProperties,
246) -> ConnectorResult<KafkaAdmin> {
247    let ctx_common = KafkaContextCommon::new(
248        properties.privatelink_common.broker_rewrite_map.clone(),
249        None,
250        None,
251        properties.aws_auth_props.clone(),
252        properties.connection.is_aws_msk_iam(),
253    )
254    .await?;
255    let client_ctx = RwConsumerContext::new(ctx_common);
256    let client: KafkaAdmin = config.create_with_context(client_ctx).await?;
257    // AdminClient calls start_poll_thread on creation, so the additional poll seems not needed. (And currently no API for this.)
258    Ok(client)
259}
260
261impl KafkaSplitEnumerator {
262    async fn get_watermarks(
263        &mut self,
264        partitions: &[i32],
265    ) -> KafkaResult<HashMap<i32, (i64, i64)>> {
266        let mut map = HashMap::new();
267        for partition in partitions {
268            let (low, high) = self
269                .client
270                .fetch_watermarks(self.topic.as_str(), *partition, self.sync_call_timeout)
271                .await?;
272            self.report_high_watermark(*partition, high);
273            map.insert(*partition, (low, high));
274        }
275        tracing::debug!("fetch kafka watermarks: {map:?}");
276        Ok(map)
277    }
278
279    pub async fn list_splits_batch(
280        &mut self,
281        expect_start_timestamp_millis: Option<i64>,
282        expect_stop_timestamp_millis: Option<i64>,
283    ) -> ConnectorResult<Vec<KafkaSplit>> {
284        let topic_partitions = self.fetch_topic_partition().await.with_context(|| {
285            format!(
286                "failed to fetch metadata from kafka ({})",
287                self.broker_address
288            )
289        })?;
290
291        // here we are getting the start offset and end offset for each partition with the given
292        // timestamp if the timestamp is None, we will use the low watermark and high
293        // watermark as the start and end offset if the timestamp is provided, we will use
294        // the watermark to narrow down the range
295        let mut expect_start_offset = if let Some(ts) = expect_start_timestamp_millis {
296            Some(
297                self.fetch_offset_for_time(topic_partitions.as_ref(), ts)
298                    .await?,
299            )
300        } else {
301            None
302        };
303
304        let mut expect_stop_offset = if let Some(ts) = expect_stop_timestamp_millis {
305            Some(
306                self.fetch_offset_for_time(topic_partitions.as_ref(), ts)
307                    .await?,
308            )
309        } else {
310            None
311        };
312
313        // Watermark here has nothing to do with watermark in streaming processing. Watermark
314        // here means smallest/largest offset available for reading.
315        let mut watermarks = {
316            let mut ret = HashMap::new();
317            for partition in &topic_partitions {
318                let (low, high) = self
319                    .client
320                    .fetch_watermarks(self.topic.as_str(), *partition, self.sync_call_timeout)
321                    .await?;
322                ret.insert(partition, (low - 1, high));
323            }
324            ret
325        };
326
327        Ok(topic_partitions
328            .iter()
329            .map(|partition| {
330                let (low, high) = watermarks.remove(&partition).unwrap();
331                let start_offset = {
332                    let start = expect_start_offset
333                        .as_mut()
334                        .map(|m| m.remove(partition).flatten().map(|t| t-1).unwrap_or(low))
335                        .unwrap_or(low);
336                    i64::max(start, low)
337                };
338                let stop_offset = {
339                    let stop = expect_stop_offset
340                        .as_mut()
341                        .map(|m| m.remove(partition).unwrap_or(Some(high)))
342                        .unwrap_or(Some(high))
343                        .unwrap_or(high);
344                    i64::min(stop, high)
345                };
346
347                if start_offset > stop_offset {
348                    tracing::warn!(
349                        "Skipping topic {} partition {}: requested start offset {} is greater than stop offset {}",
350                        self.topic,
351                        partition,
352                        start_offset,
353                        stop_offset
354                    );
355                }
356                KafkaSplit {
357                    topic: self.topic.clone(),
358                    partition: *partition,
359                    start_offset: Some(start_offset),
360                    stop_offset: Some(stop_offset),
361                }
362            })
363            .collect::<Vec<KafkaSplit>>())
364    }
365
366    async fn fetch_stop_offset(
367        &self,
368        partitions: &[i32],
369        watermarks: &HashMap<i32, (i64, i64)>,
370    ) -> KafkaResult<HashMap<i32, Option<i64>>> {
371        match self.stop_offset {
372            KafkaEnumeratorOffset::Earliest => unreachable!(),
373            KafkaEnumeratorOffset::Latest => {
374                let mut map = HashMap::new();
375                for partition in partitions {
376                    let (_, high_watermark) = watermarks.get(partition).unwrap();
377                    map.insert(*partition, Some(*high_watermark));
378                }
379                Ok(map)
380            }
381            KafkaEnumeratorOffset::Timestamp(time) => {
382                self.fetch_offset_for_time(partitions, time).await
383            }
384            KafkaEnumeratorOffset::None => partitions
385                .iter()
386                .map(|partition| Ok((*partition, None)))
387                .collect(),
388        }
389    }
390
391    async fn fetch_start_offset(
392        &self,
393        partitions: &[i32],
394        watermarks: &HashMap<i32, (i64, i64)>,
395    ) -> KafkaResult<HashMap<i32, Option<i64>>> {
396        match self.start_offset {
397            KafkaEnumeratorOffset::Earliest | KafkaEnumeratorOffset::Latest => {
398                let mut map = HashMap::new();
399                for partition in partitions {
400                    let (low_watermark, high_watermark) = watermarks.get(partition).unwrap();
401                    let offset = match self.start_offset {
402                        KafkaEnumeratorOffset::Earliest => low_watermark - 1,
403                        KafkaEnumeratorOffset::Latest => high_watermark - 1,
404                        _ => unreachable!(),
405                    };
406                    map.insert(*partition, Some(offset));
407                }
408                Ok(map)
409            }
410            KafkaEnumeratorOffset::Timestamp(time) => {
411                self.fetch_offset_for_time(partitions, time).await
412            }
413            KafkaEnumeratorOffset::None => partitions
414                .iter()
415                .map(|partition| Ok((*partition, None)))
416                .collect(),
417        }
418    }
419
420    async fn fetch_offset_for_time(
421        &self,
422        partitions: &[i32],
423        time: i64,
424    ) -> KafkaResult<HashMap<i32, Option<i64>>> {
425        let mut tpl = TopicPartitionList::new();
426
427        for partition in partitions {
428            tpl.add_partition_offset(self.topic.as_str(), *partition, Offset::Offset(time))?;
429        }
430
431        let offsets = self
432            .client
433            .offsets_for_times(tpl, self.sync_call_timeout)
434            .await?;
435
436        let mut result = HashMap::with_capacity(partitions.len());
437
438        for elem in offsets.elements_for_topic(self.topic.as_str()) {
439            match elem.offset() {
440                Offset::Offset(offset) => {
441                    // XXX(rc): currently in RW source, `offset` means the last consumed offset, so we need to subtract 1
442                    result.insert(elem.partition(), Some(offset - 1));
443                }
444                _ => {
445                    let (_, high_watermark) = self
446                        .client
447                        .fetch_watermarks(
448                            self.topic.as_str(),
449                            elem.partition(),
450                            self.sync_call_timeout,
451                        )
452                        .await?;
453                    result.insert(elem.partition(), Some(high_watermark));
454                }
455            }
456        }
457
458        Ok(result)
459    }
460
461    #[inline]
462    fn report_high_watermark(&mut self, partition: i32, offset: i64) {
463        let high_watermark_metrics =
464            self.high_watermark_metrics
465                .entry(partition)
466                .or_insert_with(|| {
467                    self.context
468                        .metrics
469                        .high_watermark
470                        .with_guarded_label_values(&[
471                            &self.context.info.source_id.to_string(),
472                            &partition.to_string(),
473                        ])
474                });
475        high_watermark_metrics.set(offset);
476    }
477
478    pub async fn check_reachability(&self) -> bool {
479        self.client
480            .fetch_metadata(Some(self.topic.as_str()), self.sync_call_timeout)
481            .await
482            .is_ok()
483    }
484
485    async fn fetch_topic_partition(&self) -> ConnectorResult<Vec<i32>> {
486        // for now, we only support one topic
487        let metadata = self
488            .client
489            .fetch_metadata(Some(self.topic.as_str()), self.sync_call_timeout)
490            .await?;
491
492        let topic_meta = match metadata.topics() {
493            [meta] => meta,
494            _ => bail!("topic {} not found", self.topic),
495        };
496
497        if topic_meta.partitions().is_empty() {
498            bail!("topic {} not found", self.topic);
499        }
500
501        Ok(topic_meta
502            .partitions()
503            .iter()
504            .map(|partition| partition.id())
505            .collect())
506    }
507}