risingwave_connector/source/kafka/
enumerator.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::{Arc, LazyLock, Weak};
17use std::time::Duration;
18
19use anyhow::{Context, anyhow};
20use async_trait::async_trait;
21use moka::future::Cache as MokaCache;
22use moka::ops::compute::Op;
23use rdkafka::admin::{AdminClient, AdminOptions};
24use rdkafka::consumer::{BaseConsumer, Consumer};
25use rdkafka::error::{KafkaError, KafkaResult};
26use rdkafka::types::RDKafkaErrorCode;
27use rdkafka::{ClientConfig, Offset, TopicPartitionList};
28use risingwave_common::bail;
29use risingwave_common::id::FragmentId;
30use risingwave_common::metrics::LabelGuardedIntGauge;
31use thiserror_ext::AsReport;
32
33use crate::connector_common::read_kafka_log_level;
34use crate::error::{ConnectorError, ConnectorResult};
35use crate::source::SourceEnumeratorContextRef;
36use crate::source::base::SplitEnumerator;
37use crate::source::kafka::split::KafkaSplit;
38use crate::source::kafka::{
39    KAFKA_ISOLATION_LEVEL, KafkaConnectionProps, KafkaContextCommon, KafkaProperties,
40    RwConsumerContext,
41};
42
43type KafkaConsumer = BaseConsumer<RwConsumerContext>;
44type KafkaAdmin = AdminClient<RwConsumerContext>;
45
46/// Consumer client is shared, and the cache doesn't manage the lifecycle, so we store `Weak` and no eviction.
47pub static SHARED_KAFKA_CONSUMER: LazyLock<MokaCache<KafkaConnectionProps, Weak<KafkaConsumer>>> =
48    LazyLock::new(|| moka::future::Cache::builder().build());
49/// Admin client is short-lived, so we store `Arc` and sets a time-to-idle eviction policy.
50pub static SHARED_KAFKA_ADMIN: LazyLock<MokaCache<KafkaConnectionProps, Arc<KafkaAdmin>>> =
51    LazyLock::new(|| {
52        moka::future::Cache::builder()
53            .time_to_idle(Duration::from_secs(5 * 60))
54            .build()
55    });
56
57#[derive(Debug, Copy, Clone, Eq, PartialEq)]
58pub enum KafkaEnumeratorOffset {
59    Earliest,
60    Latest,
61    Timestamp(i64),
62    None,
63}
64
65pub struct KafkaSplitEnumerator {
66    context: SourceEnumeratorContextRef,
67    broker_address: String,
68    topic: String,
69    client: Arc<KafkaConsumer>,
70    start_offset: KafkaEnumeratorOffset,
71
72    // maybe used in the future for batch processing
73    stop_offset: KafkaEnumeratorOffset,
74
75    sync_call_timeout: Duration,
76    high_watermark_metrics: HashMap<i32, LabelGuardedIntGauge>,
77
78    properties: KafkaProperties,
79    config: rdkafka::ClientConfig,
80}
81
82impl KafkaSplitEnumerator {
83    async fn drop_consumer_groups(&self, fragment_ids: Vec<FragmentId>) -> ConnectorResult<()> {
84        let admin = Box::pin(SHARED_KAFKA_ADMIN.try_get_with_by_ref(
85            &self.properties.connection,
86            async {
87                tracing::info!("build new kafka admin for {}", self.broker_address);
88                Ok(Arc::new(
89                    build_kafka_admin(&self.config, &self.properties).await?,
90                ))
91            },
92        ))
93        .await?;
94
95        let group_ids = fragment_ids
96            .iter()
97            .map(|fragment_id| self.properties.group_id(*fragment_id))
98            .collect::<Vec<_>>();
99        let group_ids: Vec<&str> = group_ids.iter().map(|s| s.as_str()).collect();
100        let res = admin
101            .delete_groups(&group_ids, &AdminOptions::default())
102            .await?;
103        tracing::debug!(
104            topic = self.topic,
105            ?fragment_ids,
106            "delete groups result: {res:?}"
107        );
108        Ok(())
109    }
110}
111
112#[async_trait]
113impl SplitEnumerator for KafkaSplitEnumerator {
114    type Properties = KafkaProperties;
115    type Split = KafkaSplit;
116
117    async fn new(
118        properties: KafkaProperties,
119        context: SourceEnumeratorContextRef,
120    ) -> ConnectorResult<KafkaSplitEnumerator> {
121        let mut config = rdkafka::ClientConfig::new();
122        let common_props = &properties.common;
123
124        let broker_address = properties.connection.brokers.clone();
125        let topic = common_props.topic.clone();
126        config.set("bootstrap.servers", &broker_address);
127        config.set("isolation.level", KAFKA_ISOLATION_LEVEL);
128        if let Some(log_level) = read_kafka_log_level() {
129            config.set_log_level(log_level);
130        }
131        properties.connection.set_security_properties(&mut config);
132        properties.set_client(&mut config);
133        // The meta-side split enumerator does not export librdkafka native stats, so disable
134        // periodic statistics callbacks here even if the source properties enable them for
135        // compute-side readers.
136        config.set("statistics.interval.ms", "0");
137        let mut scan_start_offset = match properties
138            .scan_startup_mode
139            .as_ref()
140            .map(|s| s.to_lowercase())
141            .as_deref()
142        {
143            Some("earliest") => KafkaEnumeratorOffset::Earliest,
144            Some("latest") => KafkaEnumeratorOffset::Latest,
145            None => KafkaEnumeratorOffset::Earliest,
146            _ => bail!(
147                "properties `scan_startup_mode` only supports earliest and latest or leaving it empty"
148            ),
149        };
150
151        if let Some(s) = &properties.time_offset {
152            let time_offset = s.parse::<i64>().map_err(|e| anyhow!(e))?;
153            scan_start_offset = KafkaEnumeratorOffset::Timestamp(time_offset)
154        }
155
156        let mut client: Option<Arc<KafkaConsumer>> = None;
157        SHARED_KAFKA_CONSUMER
158            .entry_by_ref(&properties.connection)
159            .and_try_compute_with::<_, _, ConnectorError>(|maybe_entry| async {
160                if let Some(entry) = maybe_entry {
161                    let entry_value = entry.into_value();
162                    if let Some(client_) = entry_value.upgrade() {
163                        // return if the client is already built
164                        tracing::info!("reuse existing kafka client for {}", broker_address);
165                        client = Some(client_);
166                        return Ok(Op::Nop);
167                    }
168                }
169                tracing::info!("build new kafka client for {}", broker_address);
170                client = Some(build_kafka_client(&config, &properties).await?);
171                Ok(Op::Put(Arc::downgrade(client.as_ref().unwrap())))
172            })
173            .await?;
174
175        Ok(Self {
176            context,
177            broker_address,
178            topic,
179            client: client.unwrap(),
180            start_offset: scan_start_offset,
181            stop_offset: KafkaEnumeratorOffset::None,
182            sync_call_timeout: properties.common.sync_call_timeout,
183            high_watermark_metrics: HashMap::new(),
184            properties,
185            config,
186        })
187    }
188
189    async fn list_splits(&mut self) -> ConnectorResult<Vec<KafkaSplit>> {
190        // `KafkaSplitEnumerator` uses `BaseConsumer`, which does not have a background polling
191        // thread. Poll once per `list_splits` invocation so meta's periodic source-manager tick
192        // can serve queued callbacks like librdkafka statistics events.
193        if let Some(Err(poll_err)) = {
194            #[cfg(not(madsim))]
195            {
196                self.client.poll(Duration::ZERO)
197            }
198            #[cfg(madsim)]
199            {
200                self.client.poll(Duration::ZERO).await
201            }
202        } {
203            tracing::warn!(
204                error = %poll_err.as_report(),
205                topic = self.topic,
206                broker_address = self.broker_address,
207                "failed to poll kafka client");
208        }
209
210        let topic_partitions = self.fetch_topic_partition().await.with_context(|| {
211            format!(
212                "failed to fetch metadata from kafka ({})",
213                self.broker_address
214            )
215        })?;
216
217        let watermarks = self.get_watermarks(topic_partitions.as_ref()).await?;
218        let mut start_offsets = self
219            .fetch_start_offset(topic_partitions.as_ref(), &watermarks)
220            .await?;
221
222        let mut stop_offsets = self
223            .fetch_stop_offset(topic_partitions.as_ref(), &watermarks)
224            .await?;
225
226        let ret: Vec<_> = topic_partitions
227            .into_iter()
228            .map(|partition| KafkaSplit {
229                topic: self.topic.clone(),
230                partition,
231                start_offset: start_offsets.remove(&partition).unwrap(),
232                stop_offset: stop_offsets.remove(&partition).unwrap(),
233            })
234            .collect();
235
236        Ok(ret)
237    }
238
239    async fn on_drop_fragments(&mut self, fragment_ids: Vec<FragmentId>) -> ConnectorResult<()> {
240        self.drop_consumer_groups(fragment_ids).await
241    }
242
243    async fn on_finish_backfill(&mut self, fragment_ids: Vec<FragmentId>) -> ConnectorResult<()> {
244        self.drop_consumer_groups(fragment_ids).await
245    }
246}
247
248async fn build_kafka_client(
249    config: &ClientConfig,
250    properties: &KafkaProperties,
251) -> ConnectorResult<Arc<KafkaConsumer>> {
252    let ctx_common = KafkaContextCommon::new(
253        properties.privatelink_common.broker_rewrite_map.clone(),
254        None,
255        None,
256        properties.aws_auth_props.clone(),
257        properties.connection.is_aws_msk_iam(),
258    )
259    .await?;
260    let client_ctx = RwConsumerContext::new(ctx_common);
261    let client: KafkaConsumer = config.create_with_context(client_ctx).await?;
262
263    // Note that before any SASL/OAUTHBEARER broker connection can succeed the application must call
264    // rd_kafka_oauthbearer_set_token() once – either directly or, more typically, by invoking either
265    // rd_kafka_poll(), rd_kafka_consumer_poll(), rd_kafka_queue_poll(), etc, in order to cause retrieval
266    // of an initial token to occur.
267    // https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a988395722598f63396d7a1bedb22adaf
268    if properties.connection.is_aws_msk_iam() {
269        #[cfg(not(madsim))]
270        client.poll(Duration::from_secs(10)); // note: this is a blocking call
271        #[cfg(madsim)]
272        client.poll(Duration::from_secs(10)).await;
273    }
274    Ok(Arc::new(client))
275}
276async fn build_kafka_admin(
277    config: &ClientConfig,
278    properties: &KafkaProperties,
279) -> ConnectorResult<KafkaAdmin> {
280    let ctx_common = KafkaContextCommon::new(
281        properties.privatelink_common.broker_rewrite_map.clone(),
282        None,
283        None,
284        properties.aws_auth_props.clone(),
285        properties.connection.is_aws_msk_iam(),
286    )
287    .await?;
288    let client_ctx = RwConsumerContext::new(ctx_common);
289    let client: KafkaAdmin = config.create_with_context(client_ctx).await?;
290    // AdminClient calls start_poll_thread on creation, so the additional poll seems not needed. (And currently no API for this.)
291    Ok(client)
292}
293
294impl KafkaSplitEnumerator {
295    async fn get_watermarks(
296        &mut self,
297        partitions: &[i32],
298    ) -> KafkaResult<HashMap<i32, (i64, i64)>> {
299        let mut map = HashMap::new();
300        for partition in partitions {
301            let (low, high) = self
302                .client
303                .fetch_watermarks(self.topic.as_str(), *partition, self.sync_call_timeout)
304                .await?;
305            self.report_high_watermark(*partition, high);
306            map.insert(*partition, (low, high));
307        }
308        tracing::debug!("fetch kafka watermarks: {map:?}");
309        Ok(map)
310    }
311
312    pub async fn list_splits_batch(
313        &mut self,
314        expect_start_timestamp_millis: Option<i64>,
315        expect_stop_timestamp_millis: Option<i64>,
316    ) -> ConnectorResult<Vec<KafkaSplit>> {
317        let topic_partitions = self.fetch_topic_partition().await.with_context(|| {
318            format!(
319                "failed to fetch metadata from kafka ({})",
320                self.broker_address
321            )
322        })?;
323
324        // Watermark here has nothing to do with watermark in streaming processing. Watermark
325        // here means smallest/largest offset available for reading.
326        let mut watermarks = self.get_watermarks(topic_partitions.as_ref()).await?;
327
328        // here we are getting the start offset and end offset for each partition with the given
329        // timestamp if the timestamp is None, we will use the low watermark and high
330        // watermark as the start and end offset if the timestamp is provided, we will use
331        // the watermark to narrow down the range
332        let mut expect_start_offset = if let Some(ts) = expect_start_timestamp_millis {
333            Some(
334                self.fetch_offset_for_time(topic_partitions.as_ref(), ts, &watermarks)
335                    .await?,
336            )
337        } else {
338            None
339        };
340
341        let mut expect_stop_offset = if let Some(ts) = expect_stop_timestamp_millis {
342            Some(
343                self.fetch_offset_for_time(topic_partitions.as_ref(), ts, &watermarks)
344                    .await?,
345            )
346        } else {
347            None
348        };
349
350        Ok(topic_partitions
351            .iter()
352            .map(|partition| {
353                let (low, high) = watermarks.remove(partition).unwrap();
354                let start_offset = {
355                    let earliest_offset = low - 1;
356                    let start = expect_start_offset
357                        .as_mut()
358                        .map(|m| m.remove(partition).flatten().unwrap_or(earliest_offset))
359                        .unwrap_or(earliest_offset);
360                    i64::max(start, earliest_offset)
361                };
362                let stop_offset = {
363                    let stop = expect_stop_offset
364                        .as_mut()
365                        .map(|m| m.remove(partition).unwrap_or(Some(high)))
366                        .unwrap_or(Some(high))
367                        .unwrap_or(high);
368                    i64::min(stop, high)
369                };
370
371                if start_offset > stop_offset {
372                    tracing::warn!(
373                        "Skipping topic {} partition {}: requested start offset {} is greater than stop offset {}",
374                        self.topic,
375                        partition,
376                        start_offset,
377                        stop_offset
378                    );
379                }
380                KafkaSplit {
381                    topic: self.topic.clone(),
382                    partition: *partition,
383                    start_offset: Some(start_offset),
384                    stop_offset: Some(stop_offset),
385                }
386            })
387            .collect::<Vec<KafkaSplit>>())
388    }
389
390    async fn fetch_stop_offset(
391        &self,
392        partitions: &[i32],
393        watermarks: &HashMap<i32, (i64, i64)>,
394    ) -> KafkaResult<HashMap<i32, Option<i64>>> {
395        match self.stop_offset {
396            KafkaEnumeratorOffset::Earliest => unreachable!(),
397            KafkaEnumeratorOffset::Latest => {
398                let mut map = HashMap::new();
399                for partition in partitions {
400                    let (_, high_watermark) = watermarks.get(partition).unwrap();
401                    map.insert(*partition, Some(*high_watermark));
402                }
403                Ok(map)
404            }
405            KafkaEnumeratorOffset::Timestamp(time) => {
406                self.fetch_offset_for_time(partitions, time, watermarks)
407                    .await
408            }
409            KafkaEnumeratorOffset::None => partitions
410                .iter()
411                .map(|partition| Ok((*partition, None)))
412                .collect(),
413        }
414    }
415
416    async fn fetch_start_offset(
417        &self,
418        partitions: &[i32],
419        watermarks: &HashMap<i32, (i64, i64)>,
420    ) -> KafkaResult<HashMap<i32, Option<i64>>> {
421        match self.start_offset {
422            KafkaEnumeratorOffset::Earliest | KafkaEnumeratorOffset::Latest => {
423                let mut map = HashMap::new();
424                for partition in partitions {
425                    let (low_watermark, high_watermark) = watermarks.get(partition).unwrap();
426                    let offset = match self.start_offset {
427                        KafkaEnumeratorOffset::Earliest => low_watermark - 1,
428                        KafkaEnumeratorOffset::Latest => high_watermark - 1,
429                        _ => unreachable!(),
430                    };
431                    map.insert(*partition, Some(offset));
432                }
433                Ok(map)
434            }
435            KafkaEnumeratorOffset::Timestamp(time) => {
436                self.fetch_offset_for_time(partitions, time, watermarks)
437                    .await
438            }
439            KafkaEnumeratorOffset::None => partitions
440                .iter()
441                .map(|partition| Ok((*partition, None)))
442                .collect(),
443        }
444    }
445
446    async fn fetch_offset_for_time(
447        &self,
448        partitions: &[i32],
449        time: i64,
450        watermarks: &HashMap<i32, (i64, i64)>,
451    ) -> KafkaResult<HashMap<i32, Option<i64>>> {
452        let mut tpl = TopicPartitionList::new();
453
454        for partition in partitions {
455            tpl.add_partition_offset(self.topic.as_str(), *partition, Offset::Offset(time))?;
456        }
457
458        let offsets = self
459            .client
460            .offsets_for_times(tpl, self.sync_call_timeout)
461            .await?;
462
463        let mut result = HashMap::with_capacity(partitions.len());
464
465        for elem in offsets.elements_for_topic(self.topic.as_str()) {
466            match elem.offset() {
467                Offset::Offset(offset) => {
468                    // XXX(rc): currently in RW source, `offset` means the last consumed offset, so we need to subtract 1
469                    result.insert(elem.partition(), Some(offset - 1));
470                }
471                Offset::End => {
472                    let (_, high_watermark) = watermarks.get(&elem.partition()).unwrap();
473                    tracing::info!(
474                        source_id = %self.context.info.source_id,
475                        "no message found before timestamp {} (ms) for partition {}, start from latest",
476                        time,
477                        elem.partition()
478                    );
479                    result.insert(elem.partition(), Some(high_watermark - 1)); // align to Latest
480                }
481                Offset::Invalid => {
482                    // special case for madsim test
483                    // For a read Kafka, it returns `Offset::Latest` when the timestamp is later than the latest message in the partition
484                    // But in madsim, it returns `Offset::Invalid`
485                    // So we align to Latest here
486                    tracing::info!(
487                        source_id = %self.context.info.source_id,
488                        "got invalid offset for partition  {} at timestamp {}, align to latest",
489                        elem.partition(),
490                        time
491                    );
492                    let (_, high_watermark) = watermarks.get(&elem.partition()).unwrap();
493                    result.insert(elem.partition(), Some(high_watermark - 1)); // align to Latest
494                }
495                Offset::Beginning => {
496                    let (low, _) = watermarks.get(&elem.partition()).unwrap();
497                    tracing::info!(
498                        source_id = %self.context.info.source_id,
499                        "all message in partition {} is after timestamp {} (ms), start from earliest",
500                        elem.partition(),
501                        time,
502                    );
503                    result.insert(elem.partition(), Some(low - 1)); // align to Earliest
504                }
505                err_offset @ Offset::Stored | err_offset @ Offset::OffsetTail(_) => {
506                    tracing::error!(
507                        source_id = %self.context.info.source_id,
508                        "got invalid offset for partition {}: {err_offset:?}",
509                        elem.partition(),
510                        err_offset = err_offset,
511                    );
512                    return Err(KafkaError::OffsetFetch(RDKafkaErrorCode::NoOffset));
513                }
514            }
515        }
516
517        Ok(result)
518    }
519
520    #[inline]
521    fn report_high_watermark(&mut self, partition: i32, offset: i64) {
522        let high_watermark_metrics =
523            self.high_watermark_metrics
524                .entry(partition)
525                .or_insert_with(|| {
526                    self.context
527                        .metrics
528                        .high_watermark
529                        .with_guarded_label_values(&[
530                            &self.context.info.source_id.to_string(),
531                            &partition.to_string(),
532                        ])
533                });
534        high_watermark_metrics.set(offset);
535    }
536
537    pub async fn check_reachability(&self) -> ConnectorResult<()> {
538        let _ = self
539            .client
540            .fetch_metadata(Some(self.topic.as_str()), self.sync_call_timeout)
541            .await?;
542        Ok(())
543    }
544
545    async fn fetch_topic_partition(&self) -> ConnectorResult<Vec<i32>> {
546        // for now, we only support one topic
547        let metadata = self
548            .client
549            .fetch_metadata(Some(self.topic.as_str()), self.sync_call_timeout)
550            .await?;
551
552        let topic_meta = match metadata.topics() {
553            [meta] => meta,
554            _ => bail!("topic {} not found", self.topic),
555        };
556
557        if topic_meta.partitions().is_empty() {
558            bail!("topic {} not found", self.topic);
559        }
560
561        Ok(topic_meta
562            .partitions()
563            .iter()
564            .map(|partition| partition.id())
565            .collect())
566    }
567}