risingwave_connector/source/kafka/
enumerator.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::{Arc, LazyLock, Weak};
17use std::time::Duration;
18
19use anyhow::{Context, anyhow};
20use async_trait::async_trait;
21use moka::future::Cache as MokaCache;
22use moka::ops::compute::Op;
23use rdkafka::admin::{AdminClient, AdminOptions};
24use rdkafka::consumer::{BaseConsumer, Consumer};
25use rdkafka::error::{KafkaError, KafkaResult};
26use rdkafka::types::RDKafkaErrorCode;
27use rdkafka::{ClientConfig, Offset, TopicPartitionList};
28use risingwave_common::bail;
29use risingwave_common::metrics::LabelGuardedIntGauge;
30
31use crate::connector_common::read_kafka_log_level;
32use crate::error::{ConnectorError, ConnectorResult};
33use crate::source::SourceEnumeratorContextRef;
34use crate::source::base::SplitEnumerator;
35use crate::source::kafka::split::KafkaSplit;
36use crate::source::kafka::{
37    KAFKA_ISOLATION_LEVEL, KafkaConnectionProps, KafkaContextCommon, KafkaProperties,
38    RwConsumerContext,
39};
40
41type KafkaConsumer = BaseConsumer<RwConsumerContext>;
42type KafkaAdmin = AdminClient<RwConsumerContext>;
43
44/// Consumer client is shared, and the cache doesn't manage the lifecycle, so we store `Weak` and no eviction.
45pub static SHARED_KAFKA_CONSUMER: LazyLock<MokaCache<KafkaConnectionProps, Weak<KafkaConsumer>>> =
46    LazyLock::new(|| moka::future::Cache::builder().build());
47/// Admin client is short-lived, so we store `Arc` and sets a time-to-idle eviction policy.
48pub static SHARED_KAFKA_ADMIN: LazyLock<MokaCache<KafkaConnectionProps, Arc<KafkaAdmin>>> =
49    LazyLock::new(|| {
50        moka::future::Cache::builder()
51            .time_to_idle(Duration::from_secs(5 * 60))
52            .build()
53    });
54
55#[derive(Debug, Copy, Clone, Eq, PartialEq)]
56pub enum KafkaEnumeratorOffset {
57    Earliest,
58    Latest,
59    Timestamp(i64),
60    None,
61}
62
63pub struct KafkaSplitEnumerator {
64    context: SourceEnumeratorContextRef,
65    broker_address: String,
66    topic: String,
67    client: Arc<KafkaConsumer>,
68    start_offset: KafkaEnumeratorOffset,
69
70    // maybe used in the future for batch processing
71    stop_offset: KafkaEnumeratorOffset,
72
73    sync_call_timeout: Duration,
74    high_watermark_metrics: HashMap<i32, LabelGuardedIntGauge>,
75
76    properties: KafkaProperties,
77    config: rdkafka::ClientConfig,
78}
79
80impl KafkaSplitEnumerator {
81    async fn drop_consumer_groups(&self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
82        let admin = Box::pin(SHARED_KAFKA_ADMIN.try_get_with_by_ref(
83            &self.properties.connection,
84            async {
85                tracing::info!("build new kafka admin for {}", self.broker_address);
86                Ok(Arc::new(
87                    build_kafka_admin(&self.config, &self.properties).await?,
88                ))
89            },
90        ))
91        .await?;
92
93        let group_ids = fragment_ids
94            .iter()
95            .map(|fragment_id| self.properties.group_id(*fragment_id))
96            .collect::<Vec<_>>();
97        let group_ids: Vec<&str> = group_ids.iter().map(|s| s.as_str()).collect();
98        let res = admin
99            .delete_groups(&group_ids, &AdminOptions::default())
100            .await?;
101        tracing::debug!(
102            topic = self.topic,
103            ?fragment_ids,
104            "delete groups result: {res:?}"
105        );
106        Ok(())
107    }
108}
109
110#[async_trait]
111impl SplitEnumerator for KafkaSplitEnumerator {
112    type Properties = KafkaProperties;
113    type Split = KafkaSplit;
114
115    async fn new(
116        properties: KafkaProperties,
117        context: SourceEnumeratorContextRef,
118    ) -> ConnectorResult<KafkaSplitEnumerator> {
119        let mut config = rdkafka::ClientConfig::new();
120        let common_props = &properties.common;
121
122        let broker_address = properties.connection.brokers.clone();
123        let topic = common_props.topic.clone();
124        config.set("bootstrap.servers", &broker_address);
125        config.set("isolation.level", KAFKA_ISOLATION_LEVEL);
126        if let Some(log_level) = read_kafka_log_level() {
127            config.set_log_level(log_level);
128        }
129        properties.connection.set_security_properties(&mut config);
130        properties.set_client(&mut config);
131        let mut scan_start_offset = match properties
132            .scan_startup_mode
133            .as_ref()
134            .map(|s| s.to_lowercase())
135            .as_deref()
136        {
137            Some("earliest") => KafkaEnumeratorOffset::Earliest,
138            Some("latest") => KafkaEnumeratorOffset::Latest,
139            None => KafkaEnumeratorOffset::Earliest,
140            _ => bail!(
141                "properties `scan_startup_mode` only supports earliest and latest or leaving it empty"
142            ),
143        };
144
145        if let Some(s) = &properties.time_offset {
146            let time_offset = s.parse::<i64>().map_err(|e| anyhow!(e))?;
147            scan_start_offset = KafkaEnumeratorOffset::Timestamp(time_offset)
148        }
149
150        let mut client: Option<Arc<KafkaConsumer>> = None;
151        SHARED_KAFKA_CONSUMER
152            .entry_by_ref(&properties.connection)
153            .and_try_compute_with::<_, _, ConnectorError>(|maybe_entry| async {
154                if let Some(entry) = maybe_entry {
155                    let entry_value = entry.into_value();
156                    if let Some(client_) = entry_value.upgrade() {
157                        // return if the client is already built
158                        tracing::info!("reuse existing kafka client for {}", broker_address);
159                        client = Some(client_);
160                        return Ok(Op::Nop);
161                    }
162                }
163                tracing::info!("build new kafka client for {}", broker_address);
164                client = Some(build_kafka_client(&config, &properties).await?);
165                Ok(Op::Put(Arc::downgrade(client.as_ref().unwrap())))
166            })
167            .await?;
168
169        Ok(Self {
170            context,
171            broker_address,
172            topic,
173            client: client.unwrap(),
174            start_offset: scan_start_offset,
175            stop_offset: KafkaEnumeratorOffset::None,
176            sync_call_timeout: properties.common.sync_call_timeout,
177            high_watermark_metrics: HashMap::new(),
178            properties,
179            config,
180        })
181    }
182
183    async fn list_splits(&mut self) -> ConnectorResult<Vec<KafkaSplit>> {
184        let topic_partitions = self.fetch_topic_partition().await.with_context(|| {
185            format!(
186                "failed to fetch metadata from kafka ({})",
187                self.broker_address
188            )
189        })?;
190
191        let watermarks = self.get_watermarks(topic_partitions.as_ref()).await?;
192        let mut start_offsets = self
193            .fetch_start_offset(topic_partitions.as_ref(), &watermarks)
194            .await?;
195
196        let mut stop_offsets = self
197            .fetch_stop_offset(topic_partitions.as_ref(), &watermarks)
198            .await?;
199
200        let ret: Vec<_> = topic_partitions
201            .into_iter()
202            .map(|partition| KafkaSplit {
203                topic: self.topic.clone(),
204                partition,
205                start_offset: start_offsets.remove(&partition).unwrap(),
206                stop_offset: stop_offsets.remove(&partition).unwrap(),
207            })
208            .collect();
209
210        Ok(ret)
211    }
212
213    async fn on_drop_fragments(&mut self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
214        self.drop_consumer_groups(fragment_ids).await
215    }
216
217    async fn on_finish_backfill(&mut self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
218        self.drop_consumer_groups(fragment_ids).await
219    }
220}
221
222async fn build_kafka_client(
223    config: &ClientConfig,
224    properties: &KafkaProperties,
225) -> ConnectorResult<Arc<KafkaConsumer>> {
226    let ctx_common = KafkaContextCommon::new(
227        properties.privatelink_common.broker_rewrite_map.clone(),
228        None,
229        None,
230        properties.aws_auth_props.clone(),
231        properties.connection.is_aws_msk_iam(),
232    )
233    .await?;
234    let client_ctx = RwConsumerContext::new(ctx_common);
235    let client: KafkaConsumer = config.create_with_context(client_ctx).await?;
236
237    // Note that before any SASL/OAUTHBEARER broker connection can succeed the application must call
238    // rd_kafka_oauthbearer_set_token() once – either directly or, more typically, by invoking either
239    // rd_kafka_poll(), rd_kafka_consumer_poll(), rd_kafka_queue_poll(), etc, in order to cause retrieval
240    // of an initial token to occur.
241    // https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a988395722598f63396d7a1bedb22adaf
242    if properties.connection.is_aws_msk_iam() {
243        #[cfg(not(madsim))]
244        client.poll(Duration::from_secs(10)); // note: this is a blocking call
245        #[cfg(madsim)]
246        client.poll(Duration::from_secs(10)).await;
247    }
248    Ok(Arc::new(client))
249}
250async fn build_kafka_admin(
251    config: &ClientConfig,
252    properties: &KafkaProperties,
253) -> ConnectorResult<KafkaAdmin> {
254    let ctx_common = KafkaContextCommon::new(
255        properties.privatelink_common.broker_rewrite_map.clone(),
256        None,
257        None,
258        properties.aws_auth_props.clone(),
259        properties.connection.is_aws_msk_iam(),
260    )
261    .await?;
262    let client_ctx = RwConsumerContext::new(ctx_common);
263    let client: KafkaAdmin = config.create_with_context(client_ctx).await?;
264    // AdminClient calls start_poll_thread on creation, so the additional poll seems not needed. (And currently no API for this.)
265    Ok(client)
266}
267
268impl KafkaSplitEnumerator {
269    async fn get_watermarks(
270        &mut self,
271        partitions: &[i32],
272    ) -> KafkaResult<HashMap<i32, (i64, i64)>> {
273        let mut map = HashMap::new();
274        for partition in partitions {
275            let (low, high) = self
276                .client
277                .fetch_watermarks(self.topic.as_str(), *partition, self.sync_call_timeout)
278                .await?;
279            self.report_high_watermark(*partition, high);
280            map.insert(*partition, (low, high));
281        }
282        tracing::debug!("fetch kafka watermarks: {map:?}");
283        Ok(map)
284    }
285
286    pub async fn list_splits_batch(
287        &mut self,
288        expect_start_timestamp_millis: Option<i64>,
289        expect_stop_timestamp_millis: Option<i64>,
290    ) -> ConnectorResult<Vec<KafkaSplit>> {
291        let topic_partitions = self.fetch_topic_partition().await.with_context(|| {
292            format!(
293                "failed to fetch metadata from kafka ({})",
294                self.broker_address
295            )
296        })?;
297
298        // Watermark here has nothing to do with watermark in streaming processing. Watermark
299        // here means smallest/largest offset available for reading.
300        let mut watermarks = self.get_watermarks(topic_partitions.as_ref()).await?;
301
302        // here we are getting the start offset and end offset for each partition with the given
303        // timestamp if the timestamp is None, we will use the low watermark and high
304        // watermark as the start and end offset if the timestamp is provided, we will use
305        // the watermark to narrow down the range
306        let mut expect_start_offset = if let Some(ts) = expect_start_timestamp_millis {
307            Some(
308                self.fetch_offset_for_time(topic_partitions.as_ref(), ts, &watermarks)
309                    .await?,
310            )
311        } else {
312            None
313        };
314
315        let mut expect_stop_offset = if let Some(ts) = expect_stop_timestamp_millis {
316            Some(
317                self.fetch_offset_for_time(topic_partitions.as_ref(), ts, &watermarks)
318                    .await?,
319            )
320        } else {
321            None
322        };
323
324        Ok(topic_partitions
325            .iter()
326            .map(|partition| {
327                let (low, high) = watermarks.remove(partition).unwrap();
328                let start_offset = {
329                    let earliest_offset = low - 1;
330                    let start = expect_start_offset
331                        .as_mut()
332                        .map(|m| m.remove(partition).flatten().unwrap_or(earliest_offset))
333                        .unwrap_or(earliest_offset);
334                    i64::max(start, earliest_offset)
335                };
336                let stop_offset = {
337                    let stop = expect_stop_offset
338                        .as_mut()
339                        .map(|m| m.remove(partition).unwrap_or(Some(high)))
340                        .unwrap_or(Some(high))
341                        .unwrap_or(high);
342                    i64::min(stop, high)
343                };
344
345                if start_offset > stop_offset {
346                    tracing::warn!(
347                        "Skipping topic {} partition {}: requested start offset {} is greater than stop offset {}",
348                        self.topic,
349                        partition,
350                        start_offset,
351                        stop_offset
352                    );
353                }
354                KafkaSplit {
355                    topic: self.topic.clone(),
356                    partition: *partition,
357                    start_offset: Some(start_offset),
358                    stop_offset: Some(stop_offset),
359                }
360            })
361            .collect::<Vec<KafkaSplit>>())
362    }
363
364    async fn fetch_stop_offset(
365        &self,
366        partitions: &[i32],
367        watermarks: &HashMap<i32, (i64, i64)>,
368    ) -> KafkaResult<HashMap<i32, Option<i64>>> {
369        match self.stop_offset {
370            KafkaEnumeratorOffset::Earliest => unreachable!(),
371            KafkaEnumeratorOffset::Latest => {
372                let mut map = HashMap::new();
373                for partition in partitions {
374                    let (_, high_watermark) = watermarks.get(partition).unwrap();
375                    map.insert(*partition, Some(*high_watermark));
376                }
377                Ok(map)
378            }
379            KafkaEnumeratorOffset::Timestamp(time) => {
380                self.fetch_offset_for_time(partitions, time, watermarks)
381                    .await
382            }
383            KafkaEnumeratorOffset::None => partitions
384                .iter()
385                .map(|partition| Ok((*partition, None)))
386                .collect(),
387        }
388    }
389
390    async fn fetch_start_offset(
391        &self,
392        partitions: &[i32],
393        watermarks: &HashMap<i32, (i64, i64)>,
394    ) -> KafkaResult<HashMap<i32, Option<i64>>> {
395        match self.start_offset {
396            KafkaEnumeratorOffset::Earliest | KafkaEnumeratorOffset::Latest => {
397                let mut map = HashMap::new();
398                for partition in partitions {
399                    let (low_watermark, high_watermark) = watermarks.get(partition).unwrap();
400                    let offset = match self.start_offset {
401                        KafkaEnumeratorOffset::Earliest => low_watermark - 1,
402                        KafkaEnumeratorOffset::Latest => high_watermark - 1,
403                        _ => unreachable!(),
404                    };
405                    map.insert(*partition, Some(offset));
406                }
407                Ok(map)
408            }
409            KafkaEnumeratorOffset::Timestamp(time) => {
410                self.fetch_offset_for_time(partitions, time, watermarks)
411                    .await
412            }
413            KafkaEnumeratorOffset::None => partitions
414                .iter()
415                .map(|partition| Ok((*partition, None)))
416                .collect(),
417        }
418    }
419
420    async fn fetch_offset_for_time(
421        &self,
422        partitions: &[i32],
423        time: i64,
424        watermarks: &HashMap<i32, (i64, i64)>,
425    ) -> KafkaResult<HashMap<i32, Option<i64>>> {
426        let mut tpl = TopicPartitionList::new();
427
428        for partition in partitions {
429            tpl.add_partition_offset(self.topic.as_str(), *partition, Offset::Offset(time))?;
430        }
431
432        let offsets = self
433            .client
434            .offsets_for_times(tpl, self.sync_call_timeout)
435            .await?;
436
437        let mut result = HashMap::with_capacity(partitions.len());
438
439        for elem in offsets.elements_for_topic(self.topic.as_str()) {
440            match elem.offset() {
441                Offset::Offset(offset) => {
442                    // XXX(rc): currently in RW source, `offset` means the last consumed offset, so we need to subtract 1
443                    result.insert(elem.partition(), Some(offset - 1));
444                }
445                Offset::End => {
446                    let (_, high_watermark) = watermarks.get(&elem.partition()).unwrap();
447                    tracing::info!(
448                        source_id = self.context.info.source_id,
449                        "no message found before timestamp {} (ms) for partition {}, start from latest",
450                        time,
451                        elem.partition()
452                    );
453                    result.insert(elem.partition(), Some(high_watermark - 1)); // align to Latest
454                }
455                Offset::Invalid => {
456                    // special case for madsim test
457                    // For a read Kafka, it returns `Offset::Latest` when the timestamp is later than the latest message in the partition
458                    // But in madsim, it returns `Offset::Invalid`
459                    // So we align to Latest here
460                    tracing::info!(
461                        source_id = self.context.info.source_id,
462                        "got invalid offset for partition  {} at timestamp {}, align to latest",
463                        elem.partition(),
464                        time
465                    );
466                    let (_, high_watermark) = watermarks.get(&elem.partition()).unwrap();
467                    result.insert(elem.partition(), Some(high_watermark - 1)); // align to Latest
468                }
469                Offset::Beginning => {
470                    let (low, _) = watermarks.get(&elem.partition()).unwrap();
471                    tracing::info!(
472                        source_id = self.context.info.source_id,
473                        "all message in partition {} is after timestamp {} (ms), start from earliest",
474                        elem.partition(),
475                        time,
476                    );
477                    result.insert(elem.partition(), Some(low - 1)); // align to Earliest
478                }
479                err_offset @ Offset::Stored | err_offset @ Offset::OffsetTail(_) => {
480                    tracing::error!(
481                        source_id = self.context.info.source_id,
482                        "got invalid offset for partition {}: {err_offset:?}",
483                        elem.partition(),
484                        err_offset = err_offset,
485                    );
486                    return Err(KafkaError::OffsetFetch(RDKafkaErrorCode::NoOffset));
487                }
488            }
489        }
490
491        Ok(result)
492    }
493
494    #[inline]
495    fn report_high_watermark(&mut self, partition: i32, offset: i64) {
496        let high_watermark_metrics =
497            self.high_watermark_metrics
498                .entry(partition)
499                .or_insert_with(|| {
500                    self.context
501                        .metrics
502                        .high_watermark
503                        .with_guarded_label_values(&[
504                            &self.context.info.source_id.to_string(),
505                            &partition.to_string(),
506                        ])
507                });
508        high_watermark_metrics.set(offset);
509    }
510
511    pub async fn check_reachability(&self) -> ConnectorResult<()> {
512        let _ = self
513            .client
514            .fetch_metadata(Some(self.topic.as_str()), self.sync_call_timeout)
515            .await?;
516        Ok(())
517    }
518
519    async fn fetch_topic_partition(&self) -> ConnectorResult<Vec<i32>> {
520        // for now, we only support one topic
521        let metadata = self
522            .client
523            .fetch_metadata(Some(self.topic.as_str()), self.sync_call_timeout)
524            .await?;
525
526        let topic_meta = match metadata.topics() {
527            [meta] => meta,
528            _ => bail!("topic {} not found", self.topic),
529        };
530
531        if topic_meta.partitions().is_empty() {
532            bail!("topic {} not found", self.topic);
533        }
534
535        Ok(topic_meta
536            .partitions()
537            .iter()
538            .map(|partition| partition.id())
539            .collect())
540    }
541}