risingwave_connector/source/kafka/
enumerator.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::{Arc, LazyLock, Weak};
17use std::time::Duration;
18
19use anyhow::{Context, anyhow};
20use async_trait::async_trait;
21use moka::future::Cache as MokaCache;
22use moka::ops::compute::Op;
23use rdkafka::admin::{AdminClient, AdminOptions};
24use rdkafka::consumer::{BaseConsumer, Consumer};
25use rdkafka::error::{KafkaError, KafkaResult};
26use rdkafka::types::RDKafkaErrorCode;
27use rdkafka::{ClientConfig, Offset, TopicPartitionList};
28use risingwave_common::bail;
29use risingwave_common::id::FragmentId;
30use risingwave_common::metrics::LabelGuardedIntGauge;
31
32use crate::connector_common::read_kafka_log_level;
33use crate::error::{ConnectorError, ConnectorResult};
34use crate::source::SourceEnumeratorContextRef;
35use crate::source::base::SplitEnumerator;
36use crate::source::kafka::split::KafkaSplit;
37use crate::source::kafka::{
38    KAFKA_ISOLATION_LEVEL, KafkaConnectionProps, KafkaContextCommon, KafkaProperties,
39    RwConsumerContext,
40};
41
42type KafkaConsumer = BaseConsumer<RwConsumerContext>;
43type KafkaAdmin = AdminClient<RwConsumerContext>;
44
45/// Consumer client is shared, and the cache doesn't manage the lifecycle, so we store `Weak` and no eviction.
46pub static SHARED_KAFKA_CONSUMER: LazyLock<MokaCache<KafkaConnectionProps, Weak<KafkaConsumer>>> =
47    LazyLock::new(|| moka::future::Cache::builder().build());
48/// Admin client is short-lived, so we store `Arc` and sets a time-to-idle eviction policy.
49pub static SHARED_KAFKA_ADMIN: LazyLock<MokaCache<KafkaConnectionProps, Arc<KafkaAdmin>>> =
50    LazyLock::new(|| {
51        moka::future::Cache::builder()
52            .time_to_idle(Duration::from_secs(5 * 60))
53            .build()
54    });
55
56#[derive(Debug, Copy, Clone, Eq, PartialEq)]
57pub enum KafkaEnumeratorOffset {
58    Earliest,
59    Latest,
60    Timestamp(i64),
61    None,
62}
63
64pub struct KafkaSplitEnumerator {
65    context: SourceEnumeratorContextRef,
66    broker_address: String,
67    topic: String,
68    client: Arc<KafkaConsumer>,
69    start_offset: KafkaEnumeratorOffset,
70
71    // maybe used in the future for batch processing
72    stop_offset: KafkaEnumeratorOffset,
73
74    sync_call_timeout: Duration,
75    high_watermark_metrics: HashMap<i32, LabelGuardedIntGauge>,
76
77    properties: KafkaProperties,
78    config: rdkafka::ClientConfig,
79}
80
81impl KafkaSplitEnumerator {
82    async fn drop_consumer_groups(&self, fragment_ids: Vec<FragmentId>) -> ConnectorResult<()> {
83        let admin = Box::pin(SHARED_KAFKA_ADMIN.try_get_with_by_ref(
84            &self.properties.connection,
85            async {
86                tracing::info!("build new kafka admin for {}", self.broker_address);
87                Ok(Arc::new(
88                    build_kafka_admin(&self.config, &self.properties).await?,
89                ))
90            },
91        ))
92        .await?;
93
94        let group_ids = fragment_ids
95            .iter()
96            .map(|fragment_id| self.properties.group_id(*fragment_id))
97            .collect::<Vec<_>>();
98        let group_ids: Vec<&str> = group_ids.iter().map(|s| s.as_str()).collect();
99        let res = admin
100            .delete_groups(&group_ids, &AdminOptions::default())
101            .await?;
102        tracing::debug!(
103            topic = self.topic,
104            ?fragment_ids,
105            "delete groups result: {res:?}"
106        );
107        Ok(())
108    }
109}
110
111#[async_trait]
112impl SplitEnumerator for KafkaSplitEnumerator {
113    type Properties = KafkaProperties;
114    type Split = KafkaSplit;
115
116    async fn new(
117        properties: KafkaProperties,
118        context: SourceEnumeratorContextRef,
119    ) -> ConnectorResult<KafkaSplitEnumerator> {
120        let mut config = rdkafka::ClientConfig::new();
121        let common_props = &properties.common;
122
123        let broker_address = properties.connection.brokers.clone();
124        let topic = common_props.topic.clone();
125        config.set("bootstrap.servers", &broker_address);
126        config.set("isolation.level", KAFKA_ISOLATION_LEVEL);
127        if let Some(log_level) = read_kafka_log_level() {
128            config.set_log_level(log_level);
129        }
130        properties.connection.set_security_properties(&mut config);
131        properties.set_client(&mut config);
132        let mut scan_start_offset = match properties
133            .scan_startup_mode
134            .as_ref()
135            .map(|s| s.to_lowercase())
136            .as_deref()
137        {
138            Some("earliest") => KafkaEnumeratorOffset::Earliest,
139            Some("latest") => KafkaEnumeratorOffset::Latest,
140            None => KafkaEnumeratorOffset::Earliest,
141            _ => bail!(
142                "properties `scan_startup_mode` only supports earliest and latest or leaving it empty"
143            ),
144        };
145
146        if let Some(s) = &properties.time_offset {
147            let time_offset = s.parse::<i64>().map_err(|e| anyhow!(e))?;
148            scan_start_offset = KafkaEnumeratorOffset::Timestamp(time_offset)
149        }
150
151        let mut client: Option<Arc<KafkaConsumer>> = None;
152        SHARED_KAFKA_CONSUMER
153            .entry_by_ref(&properties.connection)
154            .and_try_compute_with::<_, _, ConnectorError>(|maybe_entry| async {
155                if let Some(entry) = maybe_entry {
156                    let entry_value = entry.into_value();
157                    if let Some(client_) = entry_value.upgrade() {
158                        // return if the client is already built
159                        tracing::info!("reuse existing kafka client for {}", broker_address);
160                        client = Some(client_);
161                        return Ok(Op::Nop);
162                    }
163                }
164                tracing::info!("build new kafka client for {}", broker_address);
165                client = Some(build_kafka_client(&config, &properties).await?);
166                Ok(Op::Put(Arc::downgrade(client.as_ref().unwrap())))
167            })
168            .await?;
169
170        Ok(Self {
171            context,
172            broker_address,
173            topic,
174            client: client.unwrap(),
175            start_offset: scan_start_offset,
176            stop_offset: KafkaEnumeratorOffset::None,
177            sync_call_timeout: properties.common.sync_call_timeout,
178            high_watermark_metrics: HashMap::new(),
179            properties,
180            config,
181        })
182    }
183
184    async fn list_splits(&mut self) -> ConnectorResult<Vec<KafkaSplit>> {
185        let topic_partitions = self.fetch_topic_partition().await.with_context(|| {
186            format!(
187                "failed to fetch metadata from kafka ({})",
188                self.broker_address
189            )
190        })?;
191
192        let watermarks = self.get_watermarks(topic_partitions.as_ref()).await?;
193        let mut start_offsets = self
194            .fetch_start_offset(topic_partitions.as_ref(), &watermarks)
195            .await?;
196
197        let mut stop_offsets = self
198            .fetch_stop_offset(topic_partitions.as_ref(), &watermarks)
199            .await?;
200
201        let ret: Vec<_> = topic_partitions
202            .into_iter()
203            .map(|partition| KafkaSplit {
204                topic: self.topic.clone(),
205                partition,
206                start_offset: start_offsets.remove(&partition).unwrap(),
207                stop_offset: stop_offsets.remove(&partition).unwrap(),
208            })
209            .collect();
210
211        Ok(ret)
212    }
213
214    async fn on_drop_fragments(&mut self, fragment_ids: Vec<FragmentId>) -> ConnectorResult<()> {
215        self.drop_consumer_groups(fragment_ids).await
216    }
217
218    async fn on_finish_backfill(&mut self, fragment_ids: Vec<FragmentId>) -> ConnectorResult<()> {
219        self.drop_consumer_groups(fragment_ids).await
220    }
221}
222
223async fn build_kafka_client(
224    config: &ClientConfig,
225    properties: &KafkaProperties,
226) -> ConnectorResult<Arc<KafkaConsumer>> {
227    let ctx_common = KafkaContextCommon::new(
228        properties.privatelink_common.broker_rewrite_map.clone(),
229        None,
230        None,
231        properties.aws_auth_props.clone(),
232        properties.connection.is_aws_msk_iam(),
233    )
234    .await?;
235    let client_ctx = RwConsumerContext::new(ctx_common);
236    let client: KafkaConsumer = config.create_with_context(client_ctx).await?;
237
238    // Note that before any SASL/OAUTHBEARER broker connection can succeed the application must call
239    // rd_kafka_oauthbearer_set_token() once – either directly or, more typically, by invoking either
240    // rd_kafka_poll(), rd_kafka_consumer_poll(), rd_kafka_queue_poll(), etc, in order to cause retrieval
241    // of an initial token to occur.
242    // https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a988395722598f63396d7a1bedb22adaf
243    if properties.connection.is_aws_msk_iam() {
244        #[cfg(not(madsim))]
245        client.poll(Duration::from_secs(10)); // note: this is a blocking call
246        #[cfg(madsim)]
247        client.poll(Duration::from_secs(10)).await;
248    }
249    Ok(Arc::new(client))
250}
251async fn build_kafka_admin(
252    config: &ClientConfig,
253    properties: &KafkaProperties,
254) -> ConnectorResult<KafkaAdmin> {
255    let ctx_common = KafkaContextCommon::new(
256        properties.privatelink_common.broker_rewrite_map.clone(),
257        None,
258        None,
259        properties.aws_auth_props.clone(),
260        properties.connection.is_aws_msk_iam(),
261    )
262    .await?;
263    let client_ctx = RwConsumerContext::new(ctx_common);
264    let client: KafkaAdmin = config.create_with_context(client_ctx).await?;
265    // AdminClient calls start_poll_thread on creation, so the additional poll seems not needed. (And currently no API for this.)
266    Ok(client)
267}
268
269impl KafkaSplitEnumerator {
270    async fn get_watermarks(
271        &mut self,
272        partitions: &[i32],
273    ) -> KafkaResult<HashMap<i32, (i64, i64)>> {
274        let mut map = HashMap::new();
275        for partition in partitions {
276            let (low, high) = self
277                .client
278                .fetch_watermarks(self.topic.as_str(), *partition, self.sync_call_timeout)
279                .await?;
280            self.report_high_watermark(*partition, high);
281            map.insert(*partition, (low, high));
282        }
283        tracing::debug!("fetch kafka watermarks: {map:?}");
284        Ok(map)
285    }
286
287    pub async fn list_splits_batch(
288        &mut self,
289        expect_start_timestamp_millis: Option<i64>,
290        expect_stop_timestamp_millis: Option<i64>,
291    ) -> ConnectorResult<Vec<KafkaSplit>> {
292        let topic_partitions = self.fetch_topic_partition().await.with_context(|| {
293            format!(
294                "failed to fetch metadata from kafka ({})",
295                self.broker_address
296            )
297        })?;
298
299        // Watermark here has nothing to do with watermark in streaming processing. Watermark
300        // here means smallest/largest offset available for reading.
301        let mut watermarks = self.get_watermarks(topic_partitions.as_ref()).await?;
302
303        // here we are getting the start offset and end offset for each partition with the given
304        // timestamp if the timestamp is None, we will use the low watermark and high
305        // watermark as the start and end offset if the timestamp is provided, we will use
306        // the watermark to narrow down the range
307        let mut expect_start_offset = if let Some(ts) = expect_start_timestamp_millis {
308            Some(
309                self.fetch_offset_for_time(topic_partitions.as_ref(), ts, &watermarks)
310                    .await?,
311            )
312        } else {
313            None
314        };
315
316        let mut expect_stop_offset = if let Some(ts) = expect_stop_timestamp_millis {
317            Some(
318                self.fetch_offset_for_time(topic_partitions.as_ref(), ts, &watermarks)
319                    .await?,
320            )
321        } else {
322            None
323        };
324
325        Ok(topic_partitions
326            .iter()
327            .map(|partition| {
328                let (low, high) = watermarks.remove(partition).unwrap();
329                let start_offset = {
330                    let earliest_offset = low - 1;
331                    let start = expect_start_offset
332                        .as_mut()
333                        .map(|m| m.remove(partition).flatten().unwrap_or(earliest_offset))
334                        .unwrap_or(earliest_offset);
335                    i64::max(start, earliest_offset)
336                };
337                let stop_offset = {
338                    let stop = expect_stop_offset
339                        .as_mut()
340                        .map(|m| m.remove(partition).unwrap_or(Some(high)))
341                        .unwrap_or(Some(high))
342                        .unwrap_or(high);
343                    i64::min(stop, high)
344                };
345
346                if start_offset > stop_offset {
347                    tracing::warn!(
348                        "Skipping topic {} partition {}: requested start offset {} is greater than stop offset {}",
349                        self.topic,
350                        partition,
351                        start_offset,
352                        stop_offset
353                    );
354                }
355                KafkaSplit {
356                    topic: self.topic.clone(),
357                    partition: *partition,
358                    start_offset: Some(start_offset),
359                    stop_offset: Some(stop_offset),
360                }
361            })
362            .collect::<Vec<KafkaSplit>>())
363    }
364
365    async fn fetch_stop_offset(
366        &self,
367        partitions: &[i32],
368        watermarks: &HashMap<i32, (i64, i64)>,
369    ) -> KafkaResult<HashMap<i32, Option<i64>>> {
370        match self.stop_offset {
371            KafkaEnumeratorOffset::Earliest => unreachable!(),
372            KafkaEnumeratorOffset::Latest => {
373                let mut map = HashMap::new();
374                for partition in partitions {
375                    let (_, high_watermark) = watermarks.get(partition).unwrap();
376                    map.insert(*partition, Some(*high_watermark));
377                }
378                Ok(map)
379            }
380            KafkaEnumeratorOffset::Timestamp(time) => {
381                self.fetch_offset_for_time(partitions, time, watermarks)
382                    .await
383            }
384            KafkaEnumeratorOffset::None => partitions
385                .iter()
386                .map(|partition| Ok((*partition, None)))
387                .collect(),
388        }
389    }
390
391    async fn fetch_start_offset(
392        &self,
393        partitions: &[i32],
394        watermarks: &HashMap<i32, (i64, i64)>,
395    ) -> KafkaResult<HashMap<i32, Option<i64>>> {
396        match self.start_offset {
397            KafkaEnumeratorOffset::Earliest | KafkaEnumeratorOffset::Latest => {
398                let mut map = HashMap::new();
399                for partition in partitions {
400                    let (low_watermark, high_watermark) = watermarks.get(partition).unwrap();
401                    let offset = match self.start_offset {
402                        KafkaEnumeratorOffset::Earliest => low_watermark - 1,
403                        KafkaEnumeratorOffset::Latest => high_watermark - 1,
404                        _ => unreachable!(),
405                    };
406                    map.insert(*partition, Some(offset));
407                }
408                Ok(map)
409            }
410            KafkaEnumeratorOffset::Timestamp(time) => {
411                self.fetch_offset_for_time(partitions, time, watermarks)
412                    .await
413            }
414            KafkaEnumeratorOffset::None => partitions
415                .iter()
416                .map(|partition| Ok((*partition, None)))
417                .collect(),
418        }
419    }
420
421    async fn fetch_offset_for_time(
422        &self,
423        partitions: &[i32],
424        time: i64,
425        watermarks: &HashMap<i32, (i64, i64)>,
426    ) -> KafkaResult<HashMap<i32, Option<i64>>> {
427        let mut tpl = TopicPartitionList::new();
428
429        for partition in partitions {
430            tpl.add_partition_offset(self.topic.as_str(), *partition, Offset::Offset(time))?;
431        }
432
433        let offsets = self
434            .client
435            .offsets_for_times(tpl, self.sync_call_timeout)
436            .await?;
437
438        let mut result = HashMap::with_capacity(partitions.len());
439
440        for elem in offsets.elements_for_topic(self.topic.as_str()) {
441            match elem.offset() {
442                Offset::Offset(offset) => {
443                    // XXX(rc): currently in RW source, `offset` means the last consumed offset, so we need to subtract 1
444                    result.insert(elem.partition(), Some(offset - 1));
445                }
446                Offset::End => {
447                    let (_, high_watermark) = watermarks.get(&elem.partition()).unwrap();
448                    tracing::info!(
449                        source_id = %self.context.info.source_id,
450                        "no message found before timestamp {} (ms) for partition {}, start from latest",
451                        time,
452                        elem.partition()
453                    );
454                    result.insert(elem.partition(), Some(high_watermark - 1)); // align to Latest
455                }
456                Offset::Invalid => {
457                    // special case for madsim test
458                    // For a read Kafka, it returns `Offset::Latest` when the timestamp is later than the latest message in the partition
459                    // But in madsim, it returns `Offset::Invalid`
460                    // So we align to Latest here
461                    tracing::info!(
462                        source_id = %self.context.info.source_id,
463                        "got invalid offset for partition  {} at timestamp {}, align to latest",
464                        elem.partition(),
465                        time
466                    );
467                    let (_, high_watermark) = watermarks.get(&elem.partition()).unwrap();
468                    result.insert(elem.partition(), Some(high_watermark - 1)); // align to Latest
469                }
470                Offset::Beginning => {
471                    let (low, _) = watermarks.get(&elem.partition()).unwrap();
472                    tracing::info!(
473                        source_id = %self.context.info.source_id,
474                        "all message in partition {} is after timestamp {} (ms), start from earliest",
475                        elem.partition(),
476                        time,
477                    );
478                    result.insert(elem.partition(), Some(low - 1)); // align to Earliest
479                }
480                err_offset @ Offset::Stored | err_offset @ Offset::OffsetTail(_) => {
481                    tracing::error!(
482                        source_id = %self.context.info.source_id,
483                        "got invalid offset for partition {}: {err_offset:?}",
484                        elem.partition(),
485                        err_offset = err_offset,
486                    );
487                    return Err(KafkaError::OffsetFetch(RDKafkaErrorCode::NoOffset));
488                }
489            }
490        }
491
492        Ok(result)
493    }
494
495    #[inline]
496    fn report_high_watermark(&mut self, partition: i32, offset: i64) {
497        let high_watermark_metrics =
498            self.high_watermark_metrics
499                .entry(partition)
500                .or_insert_with(|| {
501                    self.context
502                        .metrics
503                        .high_watermark
504                        .with_guarded_label_values(&[
505                            &self.context.info.source_id.to_string(),
506                            &partition.to_string(),
507                        ])
508                });
509        high_watermark_metrics.set(offset);
510    }
511
512    pub async fn check_reachability(&self) -> ConnectorResult<()> {
513        let _ = self
514            .client
515            .fetch_metadata(Some(self.topic.as_str()), self.sync_call_timeout)
516            .await?;
517        Ok(())
518    }
519
520    async fn fetch_topic_partition(&self) -> ConnectorResult<Vec<i32>> {
521        // for now, we only support one topic
522        let metadata = self
523            .client
524            .fetch_metadata(Some(self.topic.as_str()), self.sync_call_timeout)
525            .await?;
526
527        let topic_meta = match metadata.topics() {
528            [meta] => meta,
529            _ => bail!("topic {} not found", self.topic),
530        };
531
532        if topic_meta.partitions().is_empty() {
533            bail!("topic {} not found", self.topic);
534        }
535
536        Ok(topic_meta
537            .partitions()
538            .iter()
539            .map(|partition| partition.id())
540            .collect())
541    }
542}