risingwave_connector/source/kafka/
enumerator.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::{Arc, LazyLock, Weak};
17use std::time::Duration;
18
19use anyhow::{Context, anyhow};
20use async_trait::async_trait;
21use moka::future::Cache as MokaCache;
22use moka::ops::compute::Op;
23use rdkafka::admin::{AdminClient, AdminOptions};
24use rdkafka::consumer::{BaseConsumer, Consumer};
25use rdkafka::error::{KafkaError, KafkaResult};
26use rdkafka::types::RDKafkaErrorCode;
27use rdkafka::{ClientConfig, Offset, TopicPartitionList};
28use risingwave_common::bail;
29use risingwave_common::metrics::LabelGuardedIntGauge;
30
31use crate::connector_common::read_kafka_log_level;
32use crate::error::{ConnectorError, ConnectorResult};
33use crate::source::SourceEnumeratorContextRef;
34use crate::source::base::SplitEnumerator;
35use crate::source::kafka::split::KafkaSplit;
36use crate::source::kafka::{
37    KAFKA_ISOLATION_LEVEL, KafkaConnectionProps, KafkaContextCommon, KafkaProperties,
38    RwConsumerContext,
39};
40
41type KafkaConsumer = BaseConsumer<RwConsumerContext>;
42type KafkaAdmin = AdminClient<RwConsumerContext>;
43
44/// Consumer client is shared, and the cache doesn't manage the lifecycle, so we store `Weak` and no eviction.
45pub static SHARED_KAFKA_CONSUMER: LazyLock<MokaCache<KafkaConnectionProps, Weak<KafkaConsumer>>> =
46    LazyLock::new(|| moka::future::Cache::builder().build());
47/// Admin client is short-lived, so we store `Arc` and sets a time-to-idle eviction policy.
48pub static SHARED_KAFKA_ADMIN: LazyLock<MokaCache<KafkaConnectionProps, Arc<KafkaAdmin>>> =
49    LazyLock::new(|| {
50        moka::future::Cache::builder()
51            .time_to_idle(Duration::from_secs(5 * 60))
52            .build()
53    });
54
55#[derive(Debug, Copy, Clone, Eq, PartialEq)]
56pub enum KafkaEnumeratorOffset {
57    Earliest,
58    Latest,
59    Timestamp(i64),
60    None,
61}
62
63pub struct KafkaSplitEnumerator {
64    context: SourceEnumeratorContextRef,
65    broker_address: String,
66    topic: String,
67    client: Arc<KafkaConsumer>,
68    start_offset: KafkaEnumeratorOffset,
69
70    // maybe used in the future for batch processing
71    stop_offset: KafkaEnumeratorOffset,
72
73    sync_call_timeout: Duration,
74    high_watermark_metrics: HashMap<i32, LabelGuardedIntGauge>,
75
76    properties: KafkaProperties,
77    config: rdkafka::ClientConfig,
78}
79
80impl KafkaSplitEnumerator {
81    async fn drop_consumer_groups(&self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
82        let admin = SHARED_KAFKA_ADMIN
83            .try_get_with_by_ref(&self.properties.connection, async {
84                tracing::info!("build new kafka admin for {}", self.broker_address);
85                Ok(Arc::new(
86                    build_kafka_admin(&self.config, &self.properties).await?,
87                ))
88            })
89            .await?;
90
91        let group_ids = fragment_ids
92            .iter()
93            .map(|fragment_id| self.properties.group_id(*fragment_id))
94            .collect::<Vec<_>>();
95        let group_ids: Vec<&str> = group_ids.iter().map(|s| s.as_str()).collect();
96        let res = admin
97            .delete_groups(&group_ids, &AdminOptions::default())
98            .await?;
99        tracing::debug!(
100            topic = self.topic,
101            ?fragment_ids,
102            "delete groups result: {res:?}"
103        );
104        Ok(())
105    }
106}
107
108#[async_trait]
109impl SplitEnumerator for KafkaSplitEnumerator {
110    type Properties = KafkaProperties;
111    type Split = KafkaSplit;
112
113    async fn new(
114        properties: KafkaProperties,
115        context: SourceEnumeratorContextRef,
116    ) -> ConnectorResult<KafkaSplitEnumerator> {
117        let mut config = rdkafka::ClientConfig::new();
118        let common_props = &properties.common;
119
120        let broker_address = properties.connection.brokers.clone();
121        let topic = common_props.topic.clone();
122        config.set("bootstrap.servers", &broker_address);
123        config.set("isolation.level", KAFKA_ISOLATION_LEVEL);
124        if let Some(log_level) = read_kafka_log_level() {
125            config.set_log_level(log_level);
126        }
127        properties.connection.set_security_properties(&mut config);
128        properties.set_client(&mut config);
129        let mut scan_start_offset = match properties
130            .scan_startup_mode
131            .as_ref()
132            .map(|s| s.to_lowercase())
133            .as_deref()
134        {
135            Some("earliest") => KafkaEnumeratorOffset::Earliest,
136            Some("latest") => KafkaEnumeratorOffset::Latest,
137            None => KafkaEnumeratorOffset::Earliest,
138            _ => bail!(
139                "properties `scan_startup_mode` only supports earliest and latest or leaving it empty"
140            ),
141        };
142
143        if let Some(s) = &properties.time_offset {
144            let time_offset = s.parse::<i64>().map_err(|e| anyhow!(e))?;
145            scan_start_offset = KafkaEnumeratorOffset::Timestamp(time_offset)
146        }
147
148        let mut client: Option<Arc<KafkaConsumer>> = None;
149        SHARED_KAFKA_CONSUMER
150            .entry_by_ref(&properties.connection)
151            .and_try_compute_with::<_, _, ConnectorError>(|maybe_entry| async {
152                if let Some(entry) = maybe_entry {
153                    let entry_value = entry.into_value();
154                    if let Some(client_) = entry_value.upgrade() {
155                        // return if the client is already built
156                        tracing::info!("reuse existing kafka client for {}", broker_address);
157                        client = Some(client_);
158                        return Ok(Op::Nop);
159                    }
160                }
161                tracing::info!("build new kafka client for {}", broker_address);
162                client = Some(build_kafka_client(&config, &properties).await?);
163                Ok(Op::Put(Arc::downgrade(client.as_ref().unwrap())))
164            })
165            .await?;
166
167        Ok(Self {
168            context,
169            broker_address,
170            topic,
171            client: client.unwrap(),
172            start_offset: scan_start_offset,
173            stop_offset: KafkaEnumeratorOffset::None,
174            sync_call_timeout: properties.common.sync_call_timeout,
175            high_watermark_metrics: HashMap::new(),
176            properties,
177            config,
178        })
179    }
180
181    async fn list_splits(&mut self) -> ConnectorResult<Vec<KafkaSplit>> {
182        let topic_partitions = self.fetch_topic_partition().await.with_context(|| {
183            format!(
184                "failed to fetch metadata from kafka ({})",
185                self.broker_address
186            )
187        })?;
188
189        let watermarks = self.get_watermarks(topic_partitions.as_ref()).await?;
190        let mut start_offsets = self
191            .fetch_start_offset(topic_partitions.as_ref(), &watermarks)
192            .await?;
193
194        let mut stop_offsets = self
195            .fetch_stop_offset(topic_partitions.as_ref(), &watermarks)
196            .await?;
197
198        let ret: Vec<_> = topic_partitions
199            .into_iter()
200            .map(|partition| KafkaSplit {
201                topic: self.topic.clone(),
202                partition,
203                start_offset: start_offsets.remove(&partition).unwrap(),
204                stop_offset: stop_offsets.remove(&partition).unwrap(),
205            })
206            .collect();
207
208        Ok(ret)
209    }
210
211    async fn on_drop_fragments(&mut self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
212        self.drop_consumer_groups(fragment_ids).await
213    }
214
215    async fn on_finish_backfill(&mut self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
216        self.drop_consumer_groups(fragment_ids).await
217    }
218}
219
220async fn build_kafka_client(
221    config: &ClientConfig,
222    properties: &KafkaProperties,
223) -> ConnectorResult<Arc<KafkaConsumer>> {
224    let ctx_common = KafkaContextCommon::new(
225        properties.privatelink_common.broker_rewrite_map.clone(),
226        None,
227        None,
228        properties.aws_auth_props.clone(),
229        properties.connection.is_aws_msk_iam(),
230    )
231    .await?;
232    let client_ctx = RwConsumerContext::new(ctx_common);
233    let client: KafkaConsumer = config.create_with_context(client_ctx).await?;
234
235    // Note that before any SASL/OAUTHBEARER broker connection can succeed the application must call
236    // rd_kafka_oauthbearer_set_token() once – either directly or, more typically, by invoking either
237    // rd_kafka_poll(), rd_kafka_consumer_poll(), rd_kafka_queue_poll(), etc, in order to cause retrieval
238    // of an initial token to occur.
239    // https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a988395722598f63396d7a1bedb22adaf
240    if properties.connection.is_aws_msk_iam() {
241        #[cfg(not(madsim))]
242        client.poll(Duration::from_secs(10)); // note: this is a blocking call
243        #[cfg(madsim)]
244        client.poll(Duration::from_secs(10)).await;
245    }
246    Ok(Arc::new(client))
247}
248async fn build_kafka_admin(
249    config: &ClientConfig,
250    properties: &KafkaProperties,
251) -> ConnectorResult<KafkaAdmin> {
252    let ctx_common = KafkaContextCommon::new(
253        properties.privatelink_common.broker_rewrite_map.clone(),
254        None,
255        None,
256        properties.aws_auth_props.clone(),
257        properties.connection.is_aws_msk_iam(),
258    )
259    .await?;
260    let client_ctx = RwConsumerContext::new(ctx_common);
261    let client: KafkaAdmin = config.create_with_context(client_ctx).await?;
262    // AdminClient calls start_poll_thread on creation, so the additional poll seems not needed. (And currently no API for this.)
263    Ok(client)
264}
265
266impl KafkaSplitEnumerator {
267    async fn get_watermarks(
268        &mut self,
269        partitions: &[i32],
270    ) -> KafkaResult<HashMap<i32, (i64, i64)>> {
271        let mut map = HashMap::new();
272        for partition in partitions {
273            let (low, high) = self
274                .client
275                .fetch_watermarks(self.topic.as_str(), *partition, self.sync_call_timeout)
276                .await?;
277            self.report_high_watermark(*partition, high);
278            map.insert(*partition, (low, high));
279        }
280        tracing::debug!("fetch kafka watermarks: {map:?}");
281        Ok(map)
282    }
283
284    pub async fn list_splits_batch(
285        &mut self,
286        expect_start_timestamp_millis: Option<i64>,
287        expect_stop_timestamp_millis: Option<i64>,
288    ) -> ConnectorResult<Vec<KafkaSplit>> {
289        let topic_partitions = self.fetch_topic_partition().await.with_context(|| {
290            format!(
291                "failed to fetch metadata from kafka ({})",
292                self.broker_address
293            )
294        })?;
295
296        // Watermark here has nothing to do with watermark in streaming processing. Watermark
297        // here means smallest/largest offset available for reading.
298        let mut watermarks = self.get_watermarks(topic_partitions.as_ref()).await?;
299
300        // here we are getting the start offset and end offset for each partition with the given
301        // timestamp if the timestamp is None, we will use the low watermark and high
302        // watermark as the start and end offset if the timestamp is provided, we will use
303        // the watermark to narrow down the range
304        let mut expect_start_offset = if let Some(ts) = expect_start_timestamp_millis {
305            Some(
306                self.fetch_offset_for_time(topic_partitions.as_ref(), ts, &watermarks)
307                    .await?,
308            )
309        } else {
310            None
311        };
312
313        let mut expect_stop_offset = if let Some(ts) = expect_stop_timestamp_millis {
314            Some(
315                self.fetch_offset_for_time(topic_partitions.as_ref(), ts, &watermarks)
316                    .await?,
317            )
318        } else {
319            None
320        };
321
322        Ok(topic_partitions
323            .iter()
324            .map(|partition| {
325                let (low, high) = watermarks.remove(partition).unwrap();
326                let start_offset = {
327                    let earliest_offset = low - 1;
328                    let start = expect_start_offset
329                        .as_mut()
330                        .map(|m| m.remove(partition).flatten().unwrap_or(earliest_offset))
331                        .unwrap_or(earliest_offset);
332                    i64::max(start, earliest_offset)
333                };
334                let stop_offset = {
335                    let stop = expect_stop_offset
336                        .as_mut()
337                        .map(|m| m.remove(partition).unwrap_or(Some(high)))
338                        .unwrap_or(Some(high))
339                        .unwrap_or(high);
340                    i64::min(stop, high)
341                };
342
343                if start_offset > stop_offset {
344                    tracing::warn!(
345                        "Skipping topic {} partition {}: requested start offset {} is greater than stop offset {}",
346                        self.topic,
347                        partition,
348                        start_offset,
349                        stop_offset
350                    );
351                }
352                KafkaSplit {
353                    topic: self.topic.clone(),
354                    partition: *partition,
355                    start_offset: Some(start_offset),
356                    stop_offset: Some(stop_offset),
357                }
358            })
359            .collect::<Vec<KafkaSplit>>())
360    }
361
362    async fn fetch_stop_offset(
363        &self,
364        partitions: &[i32],
365        watermarks: &HashMap<i32, (i64, i64)>,
366    ) -> KafkaResult<HashMap<i32, Option<i64>>> {
367        match self.stop_offset {
368            KafkaEnumeratorOffset::Earliest => unreachable!(),
369            KafkaEnumeratorOffset::Latest => {
370                let mut map = HashMap::new();
371                for partition in partitions {
372                    let (_, high_watermark) = watermarks.get(partition).unwrap();
373                    map.insert(*partition, Some(*high_watermark));
374                }
375                Ok(map)
376            }
377            KafkaEnumeratorOffset::Timestamp(time) => {
378                self.fetch_offset_for_time(partitions, time, watermarks)
379                    .await
380            }
381            KafkaEnumeratorOffset::None => partitions
382                .iter()
383                .map(|partition| Ok((*partition, None)))
384                .collect(),
385        }
386    }
387
388    async fn fetch_start_offset(
389        &self,
390        partitions: &[i32],
391        watermarks: &HashMap<i32, (i64, i64)>,
392    ) -> KafkaResult<HashMap<i32, Option<i64>>> {
393        match self.start_offset {
394            KafkaEnumeratorOffset::Earliest | KafkaEnumeratorOffset::Latest => {
395                let mut map = HashMap::new();
396                for partition in partitions {
397                    let (low_watermark, high_watermark) = watermarks.get(partition).unwrap();
398                    let offset = match self.start_offset {
399                        KafkaEnumeratorOffset::Earliest => low_watermark - 1,
400                        KafkaEnumeratorOffset::Latest => high_watermark - 1,
401                        _ => unreachable!(),
402                    };
403                    map.insert(*partition, Some(offset));
404                }
405                Ok(map)
406            }
407            KafkaEnumeratorOffset::Timestamp(time) => {
408                self.fetch_offset_for_time(partitions, time, watermarks)
409                    .await
410            }
411            KafkaEnumeratorOffset::None => partitions
412                .iter()
413                .map(|partition| Ok((*partition, None)))
414                .collect(),
415        }
416    }
417
418    async fn fetch_offset_for_time(
419        &self,
420        partitions: &[i32],
421        time: i64,
422        watermarks: &HashMap<i32, (i64, i64)>,
423    ) -> KafkaResult<HashMap<i32, Option<i64>>> {
424        let mut tpl = TopicPartitionList::new();
425
426        for partition in partitions {
427            tpl.add_partition_offset(self.topic.as_str(), *partition, Offset::Offset(time))?;
428        }
429
430        let offsets = self
431            .client
432            .offsets_for_times(tpl, self.sync_call_timeout)
433            .await?;
434
435        let mut result = HashMap::with_capacity(partitions.len());
436
437        for elem in offsets.elements_for_topic(self.topic.as_str()) {
438            match elem.offset() {
439                Offset::Offset(offset) => {
440                    // XXX(rc): currently in RW source, `offset` means the last consumed offset, so we need to subtract 1
441                    result.insert(elem.partition(), Some(offset - 1));
442                }
443                Offset::End => {
444                    let (_, high_watermark) = watermarks.get(&elem.partition()).unwrap();
445                    tracing::info!(
446                        source_id = self.context.info.source_id,
447                        "no message found before timestamp {} (ms) for partition {}, start from latest",
448                        time,
449                        elem.partition()
450                    );
451                    result.insert(elem.partition(), Some(high_watermark - 1)); // align to Latest
452                }
453                Offset::Invalid => {
454                    // special case for madsim test
455                    // For a read Kafka, it returns `Offset::Latest` when the timestamp is later than the latest message in the partition
456                    // But in madsim, it returns `Offset::Invalid`
457                    // So we align to Latest here
458                    tracing::info!(
459                        source_id = self.context.info.source_id,
460                        "got invalid offset for partition  {} at timestamp {}, align to latest",
461                        elem.partition(),
462                        time
463                    );
464                    let (_, high_watermark) = watermarks.get(&elem.partition()).unwrap();
465                    result.insert(elem.partition(), Some(high_watermark - 1)); // align to Latest
466                }
467                Offset::Beginning => {
468                    let (low, _) = watermarks.get(&elem.partition()).unwrap();
469                    tracing::info!(
470                        source_id = self.context.info.source_id,
471                        "all message in partition {} is after timestamp {} (ms), start from earliest",
472                        elem.partition(),
473                        time,
474                    );
475                    result.insert(elem.partition(), Some(low - 1)); // align to Earliest
476                }
477                err_offset @ Offset::Stored | err_offset @ Offset::OffsetTail(_) => {
478                    tracing::error!(
479                        source_id = self.context.info.source_id,
480                        "got invalid offset for partition {}: {err_offset:?}",
481                        elem.partition(),
482                        err_offset = err_offset,
483                    );
484                    return Err(KafkaError::OffsetFetch(RDKafkaErrorCode::NoOffset));
485                }
486            }
487        }
488
489        Ok(result)
490    }
491
492    #[inline]
493    fn report_high_watermark(&mut self, partition: i32, offset: i64) {
494        let high_watermark_metrics =
495            self.high_watermark_metrics
496                .entry(partition)
497                .or_insert_with(|| {
498                    self.context
499                        .metrics
500                        .high_watermark
501                        .with_guarded_label_values(&[
502                            &self.context.info.source_id.to_string(),
503                            &partition.to_string(),
504                        ])
505                });
506        high_watermark_metrics.set(offset);
507    }
508
509    pub async fn check_reachability(&self) -> bool {
510        self.client
511            .fetch_metadata(Some(self.topic.as_str()), self.sync_call_timeout)
512            .await
513            .is_ok()
514    }
515
516    async fn fetch_topic_partition(&self) -> ConnectorResult<Vec<i32>> {
517        // for now, we only support one topic
518        let metadata = self
519            .client
520            .fetch_metadata(Some(self.topic.as_str()), self.sync_call_timeout)
521            .await?;
522
523        let topic_meta = match metadata.topics() {
524            [meta] => meta,
525            _ => bail!("topic {} not found", self.topic),
526        };
527
528        if topic_meta.partitions().is_empty() {
529            bail!("topic {} not found", self.topic);
530        }
531
532        Ok(topic_meta
533            .partitions()
534            .iter()
535            .map(|partition| partition.id())
536            .collect())
537    }
538}