risingwave_connector/source/kafka/
enumerator.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::{Arc, LazyLock, Weak};
17use std::time::Duration;
18
19use anyhow::{Context, anyhow};
20use async_trait::async_trait;
21use moka::future::Cache as MokaCache;
22use moka::ops::compute::Op;
23use rdkafka::admin::{AdminClient, AdminOptions};
24use rdkafka::consumer::BaseConsumer;
25#[cfg(not(madsim))]
26use rdkafka::consumer::Consumer;
27use rdkafka::error::{KafkaError, KafkaResult};
28use rdkafka::types::RDKafkaErrorCode;
29use rdkafka::{ClientConfig, Offset, TopicPartitionList};
30use risingwave_common::bail;
31use risingwave_common::id::FragmentId;
32use risingwave_common::metrics::LabelGuardedIntGauge;
33use thiserror_ext::AsReport;
34
35use crate::connector_common::read_kafka_log_level;
36use crate::error::{ConnectorError, ConnectorResult};
37use crate::source::SourceEnumeratorContextRef;
38use crate::source::base::SplitEnumerator;
39use crate::source::kafka::split::KafkaSplit;
40use crate::source::kafka::{
41    KAFKA_ISOLATION_LEVEL, KafkaConnectionProps, KafkaContextCommon, KafkaProperties,
42    RwConsumerContext,
43};
44
45type KafkaConsumer = BaseConsumer<RwConsumerContext>;
46type KafkaAdmin = AdminClient<RwConsumerContext>;
47
48/// Consumer client is shared, and the cache doesn't manage the lifecycle, so we store `Weak` and no eviction.
49pub static SHARED_KAFKA_CONSUMER: LazyLock<MokaCache<KafkaConnectionProps, Weak<KafkaConsumer>>> =
50    LazyLock::new(|| moka::future::Cache::builder().build());
51/// Admin client is short-lived, so we store `Arc` and sets a time-to-idle eviction policy.
52pub static SHARED_KAFKA_ADMIN: LazyLock<MokaCache<KafkaConnectionProps, Arc<KafkaAdmin>>> =
53    LazyLock::new(|| {
54        moka::future::Cache::builder()
55            .time_to_idle(Duration::from_secs(5 * 60))
56            .build()
57    });
58
59#[derive(Debug, Copy, Clone, Eq, PartialEq)]
60pub enum KafkaEnumeratorOffset {
61    Earliest,
62    Latest,
63    Timestamp(i64),
64    None,
65}
66
67pub struct KafkaSplitEnumerator {
68    context: SourceEnumeratorContextRef,
69    broker_address: String,
70    topic: String,
71    client: Arc<KafkaConsumer>,
72    start_offset: KafkaEnumeratorOffset,
73
74    // maybe used in the future for batch processing
75    stop_offset: KafkaEnumeratorOffset,
76
77    sync_call_timeout: Duration,
78    high_watermark_metrics: HashMap<i32, LabelGuardedIntGauge>,
79
80    properties: KafkaProperties,
81    config: rdkafka::ClientConfig,
82}
83
84impl KafkaSplitEnumerator {
85    async fn drop_consumer_groups(&self, fragment_ids: Vec<FragmentId>) -> ConnectorResult<()> {
86        let admin = Box::pin(SHARED_KAFKA_ADMIN.try_get_with_by_ref(
87            &self.properties.connection,
88            async {
89                tracing::info!("build new kafka admin for {}", self.broker_address);
90                Ok(Arc::new(
91                    build_kafka_admin(&self.config, &self.properties).await?,
92                ))
93            },
94        ))
95        .await?;
96
97        let group_ids = fragment_ids
98            .iter()
99            .map(|fragment_id| self.properties.group_id(*fragment_id))
100            .collect::<Vec<_>>();
101        let group_ids: Vec<&str> = group_ids.iter().map(|s| s.as_str()).collect();
102        let res = admin
103            .delete_groups(&group_ids, &AdminOptions::default())
104            .await?;
105        tracing::debug!(
106            topic = self.topic,
107            ?fragment_ids,
108            "delete groups result: {res:?}"
109        );
110        Ok(())
111    }
112}
113
114#[async_trait]
115impl SplitEnumerator for KafkaSplitEnumerator {
116    type Properties = KafkaProperties;
117    type Split = KafkaSplit;
118
119    async fn new(
120        properties: KafkaProperties,
121        context: SourceEnumeratorContextRef,
122    ) -> ConnectorResult<KafkaSplitEnumerator> {
123        let mut config = rdkafka::ClientConfig::new();
124        let common_props = &properties.common;
125
126        let broker_address = properties.connection.brokers.clone();
127        let topic = common_props.topic.clone();
128        config.set("bootstrap.servers", &broker_address);
129        config.set("isolation.level", KAFKA_ISOLATION_LEVEL);
130        if let Some(log_level) = read_kafka_log_level() {
131            config.set_log_level(log_level);
132        }
133        properties.connection.set_security_properties(&mut config);
134        properties.set_client(&mut config);
135        // The meta-side split enumerator does not export librdkafka native stats, so disable
136        // periodic statistics callbacks here even if the source properties enable them for
137        // compute-side readers.
138        config.set("statistics.interval.ms", "0");
139        let mut scan_start_offset = match properties
140            .scan_startup_mode
141            .as_ref()
142            .map(|s| s.to_lowercase())
143            .as_deref()
144        {
145            Some("earliest") => KafkaEnumeratorOffset::Earliest,
146            Some("latest") => KafkaEnumeratorOffset::Latest,
147            None => KafkaEnumeratorOffset::Earliest,
148            _ => bail!(
149                "properties `scan_startup_mode` only supports earliest and latest or leaving it empty"
150            ),
151        };
152
153        if let Some(s) = &properties.time_offset {
154            let time_offset = s.parse::<i64>().map_err(|e| anyhow!(e))?;
155            scan_start_offset = KafkaEnumeratorOffset::Timestamp(time_offset)
156        }
157
158        let mut client: Option<Arc<KafkaConsumer>> = None;
159        SHARED_KAFKA_CONSUMER
160            .entry_by_ref(&properties.connection)
161            .and_try_compute_with::<_, _, ConnectorError>(|maybe_entry| async {
162                if let Some(entry) = maybe_entry {
163                    let entry_value = entry.into_value();
164                    if let Some(client_) = entry_value.upgrade() {
165                        // return if the client is already built
166                        tracing::info!("reuse existing kafka client for {}", broker_address);
167                        client = Some(client_);
168                        return Ok(Op::Nop);
169                    }
170                }
171                tracing::info!("build new kafka client for {}", broker_address);
172                client = Some(build_kafka_client(&config, &properties).await?);
173                Ok(Op::Put(Arc::downgrade(client.as_ref().unwrap())))
174            })
175            .await?;
176
177        Ok(Self {
178            context,
179            broker_address,
180            topic,
181            client: client.unwrap(),
182            start_offset: scan_start_offset,
183            stop_offset: KafkaEnumeratorOffset::None,
184            sync_call_timeout: properties.common.sync_call_timeout,
185            high_watermark_metrics: HashMap::new(),
186            properties,
187            config,
188        })
189    }
190
191    async fn list_splits(&mut self) -> ConnectorResult<Vec<KafkaSplit>> {
192        // `KafkaSplitEnumerator` uses `BaseConsumer`, which does not have a background polling
193        // thread. Poll once per `list_splits` invocation so meta's periodic source-manager tick
194        // can serve queued callbacks like librdkafka statistics events.
195        //
196        // This meta-side enumerator does not have a fragment id, so it cannot derive the
197        // compute-side consumer group id. Polling this no-group client may therefore return
198        // `UnknownGroup`, which is expected and intentionally filtered below. Other poll errors
199        // are still logged as warnings.
200        if let Some(Err(poll_err)) = {
201            #[cfg(not(madsim))]
202            {
203                self.client.poll(Duration::ZERO)
204            }
205            #[cfg(madsim)]
206            {
207                self.client.poll(Duration::ZERO).await
208            }
209        } && !is_expected_no_group_poll_error(&poll_err)
210        {
211            tracing::warn!(
212                error = %poll_err.as_report(),
213                topic = self.topic,
214                broker_address = self.broker_address,
215                "failed to poll kafka client");
216        }
217
218        let topic_partitions = self.fetch_topic_partition().await.with_context(|| {
219            format!(
220                "failed to fetch metadata from kafka ({})",
221                self.broker_address
222            )
223        })?;
224
225        let watermarks = self.get_watermarks(topic_partitions.as_ref()).await?;
226        let mut start_offsets = self
227            .fetch_start_offset(topic_partitions.as_ref(), &watermarks)
228            .await?;
229
230        let mut stop_offsets = self
231            .fetch_stop_offset(topic_partitions.as_ref(), &watermarks)
232            .await?;
233
234        let ret: Vec<_> = topic_partitions
235            .into_iter()
236            .map(|partition| KafkaSplit {
237                topic: self.topic.clone(),
238                partition,
239                start_offset: start_offsets.remove(&partition).unwrap(),
240                stop_offset: stop_offsets.remove(&partition).unwrap(),
241            })
242            .collect();
243
244        Ok(ret)
245    }
246
247    async fn on_drop_fragments(&mut self, fragment_ids: Vec<FragmentId>) -> ConnectorResult<()> {
248        self.drop_consumer_groups(fragment_ids).await
249    }
250
251    async fn on_finish_backfill(&mut self, fragment_ids: Vec<FragmentId>) -> ConnectorResult<()> {
252        self.drop_consumer_groups(fragment_ids).await
253    }
254}
255
256fn is_expected_no_group_poll_error(error: &KafkaError) -> bool {
257    matches!(
258        error,
259        KafkaError::MessageConsumption(RDKafkaErrorCode::UnknownGroup)
260    )
261}
262
263async fn build_kafka_client(
264    config: &ClientConfig,
265    properties: &KafkaProperties,
266) -> ConnectorResult<Arc<KafkaConsumer>> {
267    let ctx_common = KafkaContextCommon::new(
268        properties.privatelink_common.broker_rewrite_map.clone(),
269        None,
270        None,
271        properties.aws_auth_props.clone(),
272        properties.connection.is_aws_msk_iam(),
273    )
274    .await?;
275    let client_ctx = RwConsumerContext::new(ctx_common);
276    let client: KafkaConsumer = config.create_with_context(client_ctx).await?;
277
278    // Note that before any SASL/OAUTHBEARER broker connection can succeed the application must call
279    // rd_kafka_oauthbearer_set_token() once – either directly or, more typically, by invoking either
280    // rd_kafka_poll(), rd_kafka_consumer_poll(), rd_kafka_queue_poll(), etc, in order to cause retrieval
281    // of an initial token to occur.
282    // https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a988395722598f63396d7a1bedb22adaf
283    if properties.connection.is_aws_msk_iam() {
284        #[cfg(not(madsim))]
285        client.poll(Duration::from_secs(10)); // note: this is a blocking call
286        #[cfg(madsim)]
287        client.poll(Duration::from_secs(10)).await;
288    }
289    Ok(Arc::new(client))
290}
291async fn build_kafka_admin(
292    config: &ClientConfig,
293    properties: &KafkaProperties,
294) -> ConnectorResult<KafkaAdmin> {
295    let ctx_common = KafkaContextCommon::new(
296        properties.privatelink_common.broker_rewrite_map.clone(),
297        None,
298        None,
299        properties.aws_auth_props.clone(),
300        properties.connection.is_aws_msk_iam(),
301    )
302    .await?;
303    let client_ctx = RwConsumerContext::new(ctx_common);
304    let client: KafkaAdmin = config.create_with_context(client_ctx).await?;
305    // AdminClient calls start_poll_thread on creation, so the additional poll seems not needed. (And currently no API for this.)
306    Ok(client)
307}
308
309impl KafkaSplitEnumerator {
310    async fn get_watermarks(
311        &mut self,
312        partitions: &[i32],
313    ) -> KafkaResult<HashMap<i32, (i64, i64)>> {
314        let mut map = HashMap::new();
315        for partition in partitions {
316            let (low, high) = self
317                .client
318                .fetch_watermarks(self.topic.as_str(), *partition, self.sync_call_timeout)
319                .await?;
320            self.report_high_watermark(*partition, high);
321            map.insert(*partition, (low, high));
322        }
323        tracing::debug!("fetch kafka watermarks: {map:?}");
324        Ok(map)
325    }
326
327    pub async fn list_splits_batch(
328        &mut self,
329        expect_start_timestamp_millis: Option<i64>,
330        expect_stop_timestamp_millis: Option<i64>,
331    ) -> ConnectorResult<Vec<KafkaSplit>> {
332        let topic_partitions = self.fetch_topic_partition().await.with_context(|| {
333            format!(
334                "failed to fetch metadata from kafka ({})",
335                self.broker_address
336            )
337        })?;
338
339        // Watermark here has nothing to do with watermark in streaming processing. Watermark
340        // here means smallest/largest offset available for reading.
341        let mut watermarks = self.get_watermarks(topic_partitions.as_ref()).await?;
342
343        // here we are getting the start offset and end offset for each partition with the given
344        // timestamp if the timestamp is None, we will use the low watermark and high
345        // watermark as the start and end offset if the timestamp is provided, we will use
346        // the watermark to narrow down the range
347        let mut expect_start_offset = if let Some(ts) = expect_start_timestamp_millis {
348            Some(
349                self.fetch_offset_for_time(topic_partitions.as_ref(), ts, &watermarks)
350                    .await?,
351            )
352        } else {
353            None
354        };
355
356        let mut expect_stop_offset = if let Some(ts) = expect_stop_timestamp_millis {
357            Some(
358                self.fetch_offset_for_time(topic_partitions.as_ref(), ts, &watermarks)
359                    .await?,
360            )
361        } else {
362            None
363        };
364
365        Ok(topic_partitions
366            .iter()
367            .map(|partition| {
368                let (low, high) = watermarks.remove(partition).unwrap();
369                let start_offset = {
370                    let earliest_offset = low - 1;
371                    let start = expect_start_offset
372                        .as_mut()
373                        .map(|m| m.remove(partition).flatten().unwrap_or(earliest_offset))
374                        .unwrap_or(earliest_offset);
375                    i64::max(start, earliest_offset)
376                };
377                let stop_offset = {
378                    let stop = expect_stop_offset
379                        .as_mut()
380                        .map(|m| m.remove(partition).unwrap_or(Some(high)))
381                        .unwrap_or(Some(high))
382                        .unwrap_or(high);
383                    i64::min(stop, high)
384                };
385
386                if start_offset > stop_offset {
387                    tracing::warn!(
388                        "Skipping topic {} partition {}: requested start offset {} is greater than stop offset {}",
389                        self.topic,
390                        partition,
391                        start_offset,
392                        stop_offset
393                    );
394                }
395                KafkaSplit {
396                    topic: self.topic.clone(),
397                    partition: *partition,
398                    start_offset: Some(start_offset),
399                    stop_offset: Some(stop_offset),
400                }
401            })
402            .collect::<Vec<KafkaSplit>>())
403    }
404
405    async fn fetch_stop_offset(
406        &self,
407        partitions: &[i32],
408        watermarks: &HashMap<i32, (i64, i64)>,
409    ) -> KafkaResult<HashMap<i32, Option<i64>>> {
410        match self.stop_offset {
411            KafkaEnumeratorOffset::Earliest => unreachable!(),
412            KafkaEnumeratorOffset::Latest => {
413                let mut map = HashMap::new();
414                for partition in partitions {
415                    let (_, high_watermark) = watermarks.get(partition).unwrap();
416                    map.insert(*partition, Some(*high_watermark));
417                }
418                Ok(map)
419            }
420            KafkaEnumeratorOffset::Timestamp(time) => {
421                self.fetch_offset_for_time(partitions, time, watermarks)
422                    .await
423            }
424            KafkaEnumeratorOffset::None => partitions
425                .iter()
426                .map(|partition| Ok((*partition, None)))
427                .collect(),
428        }
429    }
430
431    async fn fetch_start_offset(
432        &self,
433        partitions: &[i32],
434        watermarks: &HashMap<i32, (i64, i64)>,
435    ) -> KafkaResult<HashMap<i32, Option<i64>>> {
436        match self.start_offset {
437            KafkaEnumeratorOffset::Earliest | KafkaEnumeratorOffset::Latest => {
438                let mut map = HashMap::new();
439                for partition in partitions {
440                    let (low_watermark, high_watermark) = watermarks.get(partition).unwrap();
441                    let offset = match self.start_offset {
442                        KafkaEnumeratorOffset::Earliest => low_watermark - 1,
443                        KafkaEnumeratorOffset::Latest => high_watermark - 1,
444                        _ => unreachable!(),
445                    };
446                    map.insert(*partition, Some(offset));
447                }
448                Ok(map)
449            }
450            KafkaEnumeratorOffset::Timestamp(time) => {
451                self.fetch_offset_for_time(partitions, time, watermarks)
452                    .await
453            }
454            KafkaEnumeratorOffset::None => partitions
455                .iter()
456                .map(|partition| Ok((*partition, None)))
457                .collect(),
458        }
459    }
460
461    async fn fetch_offset_for_time(
462        &self,
463        partitions: &[i32],
464        time: i64,
465        watermarks: &HashMap<i32, (i64, i64)>,
466    ) -> KafkaResult<HashMap<i32, Option<i64>>> {
467        let mut tpl = TopicPartitionList::new();
468
469        for partition in partitions {
470            tpl.add_partition_offset(self.topic.as_str(), *partition, Offset::Offset(time))?;
471        }
472
473        let offsets = self
474            .client
475            .offsets_for_times(tpl, self.sync_call_timeout)
476            .await?;
477
478        let mut result = HashMap::with_capacity(partitions.len());
479
480        for elem in offsets.elements_for_topic(self.topic.as_str()) {
481            match elem.offset() {
482                Offset::Offset(offset) => {
483                    // XXX(rc): currently in RW source, `offset` means the last consumed offset, so we need to subtract 1
484                    result.insert(elem.partition(), Some(offset - 1));
485                }
486                Offset::End => {
487                    let (_, high_watermark) = watermarks.get(&elem.partition()).unwrap();
488                    tracing::info!(
489                        source_id = %self.context.info.source_id,
490                        "no message found before timestamp {} (ms) for partition {}, start from latest",
491                        time,
492                        elem.partition()
493                    );
494                    result.insert(elem.partition(), Some(high_watermark - 1)); // align to Latest
495                }
496                Offset::Invalid => {
497                    // special case for madsim test
498                    // For a read Kafka, it returns `Offset::Latest` when the timestamp is later than the latest message in the partition
499                    // But in madsim, it returns `Offset::Invalid`
500                    // So we align to Latest here
501                    tracing::info!(
502                        source_id = %self.context.info.source_id,
503                        "got invalid offset for partition  {} at timestamp {}, align to latest",
504                        elem.partition(),
505                        time
506                    );
507                    let (_, high_watermark) = watermarks.get(&elem.partition()).unwrap();
508                    result.insert(elem.partition(), Some(high_watermark - 1)); // align to Latest
509                }
510                Offset::Beginning => {
511                    let (low, _) = watermarks.get(&elem.partition()).unwrap();
512                    tracing::info!(
513                        source_id = %self.context.info.source_id,
514                        "all message in partition {} is after timestamp {} (ms), start from earliest",
515                        elem.partition(),
516                        time,
517                    );
518                    result.insert(elem.partition(), Some(low - 1)); // align to Earliest
519                }
520                err_offset @ Offset::Stored | err_offset @ Offset::OffsetTail(_) => {
521                    tracing::error!(
522                        source_id = %self.context.info.source_id,
523                        "got invalid offset for partition {}: {err_offset:?}",
524                        elem.partition(),
525                        err_offset = err_offset,
526                    );
527                    return Err(KafkaError::OffsetFetch(RDKafkaErrorCode::NoOffset));
528                }
529            }
530        }
531
532        Ok(result)
533    }
534
535    #[inline]
536    fn report_high_watermark(&mut self, partition: i32, offset: i64) {
537        let high_watermark_metrics =
538            self.high_watermark_metrics
539                .entry(partition)
540                .or_insert_with(|| {
541                    self.context
542                        .metrics
543                        .high_watermark
544                        .with_guarded_label_values(&[
545                            &self.context.info.source_id.to_string(),
546                            &partition.to_string(),
547                        ])
548                });
549        high_watermark_metrics.set(offset);
550    }
551
552    pub async fn check_reachability(&self) -> ConnectorResult<()> {
553        let _ = self
554            .client
555            .fetch_metadata(Some(self.topic.as_str()), self.sync_call_timeout)
556            .await?;
557        Ok(())
558    }
559
560    async fn fetch_topic_partition(&self) -> ConnectorResult<Vec<i32>> {
561        // for now, we only support one topic
562        let metadata = self
563            .client
564            .fetch_metadata(Some(self.topic.as_str()), self.sync_call_timeout)
565            .await?;
566
567        let topic_meta = match metadata.topics() {
568            [meta] => meta,
569            _ => bail!("topic {} not found", self.topic),
570        };
571
572        if topic_meta.partitions().is_empty() {
573            bail!("topic {} not found", self.topic);
574        }
575
576        Ok(topic_meta
577            .partitions()
578            .iter()
579            .map(|partition| partition.id())
580            .collect())
581    }
582}