risingwave_connector/source/kafka/
enumerator.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::{Arc, LazyLock, Weak};
17use std::time::Duration;
18
19use anyhow::{Context, anyhow};
20use async_trait::async_trait;
21use moka::future::Cache as MokaCache;
22use moka::ops::compute::Op;
23use rdkafka::admin::{AdminClient, AdminOptions};
24use rdkafka::consumer::{BaseConsumer, Consumer};
25use rdkafka::error::KafkaResult;
26use rdkafka::{ClientConfig, Offset, TopicPartitionList};
27use risingwave_common::bail;
28use risingwave_common::metrics::LabelGuardedIntGauge;
29
30use crate::connector_common::read_kafka_log_level;
31use crate::error::{ConnectorError, ConnectorResult};
32use crate::source::SourceEnumeratorContextRef;
33use crate::source::base::SplitEnumerator;
34use crate::source::kafka::split::KafkaSplit;
35use crate::source::kafka::{
36    KAFKA_ISOLATION_LEVEL, KafkaConnectionProps, KafkaContextCommon, KafkaProperties,
37    RwConsumerContext,
38};
39
40type KafkaConsumer = BaseConsumer<RwConsumerContext>;
41type KafkaAdmin = AdminClient<RwConsumerContext>;
42
43/// Consumer client is shared, and the cache doesn't manage the lifecycle, so we store `Weak` and no eviction.
44pub static SHARED_KAFKA_CONSUMER: LazyLock<MokaCache<KafkaConnectionProps, Weak<KafkaConsumer>>> =
45    LazyLock::new(|| moka::future::Cache::builder().build());
46/// Admin client is short-lived, so we store `Arc` and sets a time-to-idle eviction policy.
47pub static SHARED_KAFKA_ADMIN: LazyLock<MokaCache<KafkaConnectionProps, Arc<KafkaAdmin>>> =
48    LazyLock::new(|| {
49        moka::future::Cache::builder()
50            .time_to_idle(Duration::from_secs(5 * 60))
51            .build()
52    });
53
54#[derive(Debug, Copy, Clone, Eq, PartialEq)]
55pub enum KafkaEnumeratorOffset {
56    Earliest,
57    Latest,
58    Timestamp(i64),
59    None,
60}
61
62pub struct KafkaSplitEnumerator {
63    context: SourceEnumeratorContextRef,
64    broker_address: String,
65    topic: String,
66    client: Arc<KafkaConsumer>,
67    start_offset: KafkaEnumeratorOffset,
68
69    // maybe used in the future for batch processing
70    stop_offset: KafkaEnumeratorOffset,
71
72    sync_call_timeout: Duration,
73    high_watermark_metrics: HashMap<i32, LabelGuardedIntGauge>,
74
75    properties: KafkaProperties,
76    config: rdkafka::ClientConfig,
77}
78
79impl KafkaSplitEnumerator {
80    async fn drop_consumer_groups(&self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
81        let admin = SHARED_KAFKA_ADMIN
82            .try_get_with_by_ref(&self.properties.connection, async {
83                tracing::info!("build new kafka admin for {}", self.broker_address);
84                Ok(Arc::new(
85                    build_kafka_admin(&self.config, &self.properties).await?,
86                ))
87            })
88            .await?;
89
90        let group_ids = fragment_ids
91            .iter()
92            .map(|fragment_id| self.properties.group_id(*fragment_id))
93            .collect::<Vec<_>>();
94        let group_ids: Vec<&str> = group_ids.iter().map(|s| s.as_str()).collect();
95        let res = admin
96            .delete_groups(&group_ids, &AdminOptions::default())
97            .await?;
98        tracing::debug!(
99            topic = self.topic,
100            ?fragment_ids,
101            "delete groups result: {res:?}"
102        );
103        Ok(())
104    }
105}
106
107#[async_trait]
108impl SplitEnumerator for KafkaSplitEnumerator {
109    type Properties = KafkaProperties;
110    type Split = KafkaSplit;
111
112    async fn new(
113        properties: KafkaProperties,
114        context: SourceEnumeratorContextRef,
115    ) -> ConnectorResult<KafkaSplitEnumerator> {
116        let mut config = rdkafka::ClientConfig::new();
117        let common_props = &properties.common;
118
119        let broker_address = properties.connection.brokers.clone();
120        let topic = common_props.topic.clone();
121        config.set("bootstrap.servers", &broker_address);
122        config.set("isolation.level", KAFKA_ISOLATION_LEVEL);
123        if let Some(log_level) = read_kafka_log_level() {
124            config.set_log_level(log_level);
125        }
126        properties.connection.set_security_properties(&mut config);
127        properties.set_client(&mut config);
128        let mut scan_start_offset = match properties
129            .scan_startup_mode
130            .as_ref()
131            .map(|s| s.to_lowercase())
132            .as_deref()
133        {
134            Some("earliest") => KafkaEnumeratorOffset::Earliest,
135            Some("latest") => KafkaEnumeratorOffset::Latest,
136            None => KafkaEnumeratorOffset::Earliest,
137            _ => bail!(
138                "properties `scan_startup_mode` only supports earliest and latest or leaving it empty"
139            ),
140        };
141
142        if let Some(s) = &properties.time_offset {
143            let time_offset = s.parse::<i64>().map_err(|e| anyhow!(e))?;
144            scan_start_offset = KafkaEnumeratorOffset::Timestamp(time_offset)
145        }
146
147        let mut client: Option<Arc<KafkaConsumer>> = None;
148        SHARED_KAFKA_CONSUMER
149            .entry_by_ref(&properties.connection)
150            .and_try_compute_with::<_, _, ConnectorError>(|maybe_entry| async {
151                if let Some(entry) = maybe_entry {
152                    let entry_value = entry.into_value();
153                    if let Some(client_) = entry_value.upgrade() {
154                        // return if the client is already built
155                        tracing::info!("reuse existing kafka client for {}", broker_address);
156                        client = Some(client_);
157                        return Ok(Op::Nop);
158                    }
159                }
160                tracing::info!("build new kafka client for {}", broker_address);
161                client = Some(build_kafka_client(&config, &properties).await?);
162                Ok(Op::Put(Arc::downgrade(client.as_ref().unwrap())))
163            })
164            .await?;
165
166        Ok(Self {
167            context,
168            broker_address,
169            topic,
170            client: client.unwrap(),
171            start_offset: scan_start_offset,
172            stop_offset: KafkaEnumeratorOffset::None,
173            sync_call_timeout: properties.common.sync_call_timeout,
174            high_watermark_metrics: HashMap::new(),
175            properties,
176            config,
177        })
178    }
179
180    async fn list_splits(&mut self) -> ConnectorResult<Vec<KafkaSplit>> {
181        let topic_partitions = self.fetch_topic_partition().await.with_context(|| {
182            format!(
183                "failed to fetch metadata from kafka ({})",
184                self.broker_address
185            )
186        })?;
187
188        let watermarks = self.get_watermarks(topic_partitions.as_ref()).await?;
189        let mut start_offsets = self
190            .fetch_start_offset(topic_partitions.as_ref(), &watermarks)
191            .await?;
192
193        let mut stop_offsets = self
194            .fetch_stop_offset(topic_partitions.as_ref(), &watermarks)
195            .await?;
196
197        let ret: Vec<_> = topic_partitions
198            .into_iter()
199            .map(|partition| KafkaSplit {
200                topic: self.topic.clone(),
201                partition,
202                start_offset: start_offsets.remove(&partition).unwrap(),
203                stop_offset: stop_offsets.remove(&partition).unwrap(),
204            })
205            .collect();
206
207        Ok(ret)
208    }
209
210    async fn on_drop_fragments(&mut self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
211        self.drop_consumer_groups(fragment_ids).await
212    }
213
214    async fn on_finish_backfill(&mut self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
215        self.drop_consumer_groups(fragment_ids).await
216    }
217}
218
219async fn build_kafka_client(
220    config: &ClientConfig,
221    properties: &KafkaProperties,
222) -> ConnectorResult<Arc<KafkaConsumer>> {
223    let ctx_common = KafkaContextCommon::new(
224        properties.privatelink_common.broker_rewrite_map.clone(),
225        None,
226        None,
227        properties.aws_auth_props.clone(),
228        properties.connection.is_aws_msk_iam(),
229    )
230    .await?;
231    let client_ctx = RwConsumerContext::new(ctx_common);
232    let client: KafkaConsumer = config.create_with_context(client_ctx).await?;
233
234    // Note that before any SASL/OAUTHBEARER broker connection can succeed the application must call
235    // rd_kafka_oauthbearer_set_token() once – either directly or, more typically, by invoking either
236    // rd_kafka_poll(), rd_kafka_consumer_poll(), rd_kafka_queue_poll(), etc, in order to cause retrieval
237    // of an initial token to occur.
238    // https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a988395722598f63396d7a1bedb22adaf
239    if properties.connection.is_aws_msk_iam() {
240        #[cfg(not(madsim))]
241        client.poll(Duration::from_secs(10)); // note: this is a blocking call
242        #[cfg(madsim)]
243        client.poll(Duration::from_secs(10)).await;
244    }
245    Ok(Arc::new(client))
246}
247async fn build_kafka_admin(
248    config: &ClientConfig,
249    properties: &KafkaProperties,
250) -> ConnectorResult<KafkaAdmin> {
251    let ctx_common = KafkaContextCommon::new(
252        properties.privatelink_common.broker_rewrite_map.clone(),
253        None,
254        None,
255        properties.aws_auth_props.clone(),
256        properties.connection.is_aws_msk_iam(),
257    )
258    .await?;
259    let client_ctx = RwConsumerContext::new(ctx_common);
260    let client: KafkaAdmin = config.create_with_context(client_ctx).await?;
261    // AdminClient calls start_poll_thread on creation, so the additional poll seems not needed. (And currently no API for this.)
262    Ok(client)
263}
264
265impl KafkaSplitEnumerator {
266    async fn get_watermarks(
267        &mut self,
268        partitions: &[i32],
269    ) -> KafkaResult<HashMap<i32, (i64, i64)>> {
270        let mut map = HashMap::new();
271        for partition in partitions {
272            let (low, high) = self
273                .client
274                .fetch_watermarks(self.topic.as_str(), *partition, self.sync_call_timeout)
275                .await?;
276            self.report_high_watermark(*partition, high);
277            map.insert(*partition, (low, high));
278        }
279        tracing::debug!("fetch kafka watermarks: {map:?}");
280        Ok(map)
281    }
282
283    pub async fn list_splits_batch(
284        &mut self,
285        expect_start_timestamp_millis: Option<i64>,
286        expect_stop_timestamp_millis: Option<i64>,
287    ) -> ConnectorResult<Vec<KafkaSplit>> {
288        let topic_partitions = self.fetch_topic_partition().await.with_context(|| {
289            format!(
290                "failed to fetch metadata from kafka ({})",
291                self.broker_address
292            )
293        })?;
294
295        // here we are getting the start offset and end offset for each partition with the given
296        // timestamp if the timestamp is None, we will use the low watermark and high
297        // watermark as the start and end offset if the timestamp is provided, we will use
298        // the watermark to narrow down the range
299        let mut expect_start_offset = if let Some(ts) = expect_start_timestamp_millis {
300            Some(
301                self.fetch_offset_for_time(topic_partitions.as_ref(), ts)
302                    .await?,
303            )
304        } else {
305            None
306        };
307
308        let mut expect_stop_offset = if let Some(ts) = expect_stop_timestamp_millis {
309            Some(
310                self.fetch_offset_for_time(topic_partitions.as_ref(), ts)
311                    .await?,
312            )
313        } else {
314            None
315        };
316
317        // Watermark here has nothing to do with watermark in streaming processing. Watermark
318        // here means smallest/largest offset available for reading.
319        let mut watermarks = {
320            let mut ret = HashMap::new();
321            for partition in &topic_partitions {
322                let (low, high) = self
323                    .client
324                    .fetch_watermarks(self.topic.as_str(), *partition, self.sync_call_timeout)
325                    .await?;
326                ret.insert(partition, (low - 1, high));
327            }
328            ret
329        };
330
331        Ok(topic_partitions
332            .iter()
333            .map(|partition| {
334                let (low, high) = watermarks.remove(&partition).unwrap();
335                let start_offset = {
336                    let start = expect_start_offset
337                        .as_mut()
338                        .map(|m| m.remove(partition).flatten().map(|t| t-1).unwrap_or(low))
339                        .unwrap_or(low);
340                    i64::max(start, low)
341                };
342                let stop_offset = {
343                    let stop = expect_stop_offset
344                        .as_mut()
345                        .map(|m| m.remove(partition).unwrap_or(Some(high)))
346                        .unwrap_or(Some(high))
347                        .unwrap_or(high);
348                    i64::min(stop, high)
349                };
350
351                if start_offset > stop_offset {
352                    tracing::warn!(
353                        "Skipping topic {} partition {}: requested start offset {} is greater than stop offset {}",
354                        self.topic,
355                        partition,
356                        start_offset,
357                        stop_offset
358                    );
359                }
360                KafkaSplit {
361                    topic: self.topic.clone(),
362                    partition: *partition,
363                    start_offset: Some(start_offset),
364                    stop_offset: Some(stop_offset),
365                }
366            })
367            .collect::<Vec<KafkaSplit>>())
368    }
369
370    async fn fetch_stop_offset(
371        &self,
372        partitions: &[i32],
373        watermarks: &HashMap<i32, (i64, i64)>,
374    ) -> KafkaResult<HashMap<i32, Option<i64>>> {
375        match self.stop_offset {
376            KafkaEnumeratorOffset::Earliest => unreachable!(),
377            KafkaEnumeratorOffset::Latest => {
378                let mut map = HashMap::new();
379                for partition in partitions {
380                    let (_, high_watermark) = watermarks.get(partition).unwrap();
381                    map.insert(*partition, Some(*high_watermark));
382                }
383                Ok(map)
384            }
385            KafkaEnumeratorOffset::Timestamp(time) => {
386                self.fetch_offset_for_time(partitions, time).await
387            }
388            KafkaEnumeratorOffset::None => partitions
389                .iter()
390                .map(|partition| Ok((*partition, None)))
391                .collect(),
392        }
393    }
394
395    async fn fetch_start_offset(
396        &self,
397        partitions: &[i32],
398        watermarks: &HashMap<i32, (i64, i64)>,
399    ) -> KafkaResult<HashMap<i32, Option<i64>>> {
400        match self.start_offset {
401            KafkaEnumeratorOffset::Earliest | KafkaEnumeratorOffset::Latest => {
402                let mut map = HashMap::new();
403                for partition in partitions {
404                    let (low_watermark, high_watermark) = watermarks.get(partition).unwrap();
405                    let offset = match self.start_offset {
406                        KafkaEnumeratorOffset::Earliest => low_watermark - 1,
407                        KafkaEnumeratorOffset::Latest => high_watermark - 1,
408                        _ => unreachable!(),
409                    };
410                    map.insert(*partition, Some(offset));
411                }
412                Ok(map)
413            }
414            KafkaEnumeratorOffset::Timestamp(time) => {
415                self.fetch_offset_for_time(partitions, time).await
416            }
417            KafkaEnumeratorOffset::None => partitions
418                .iter()
419                .map(|partition| Ok((*partition, None)))
420                .collect(),
421        }
422    }
423
424    async fn fetch_offset_for_time(
425        &self,
426        partitions: &[i32],
427        time: i64,
428    ) -> KafkaResult<HashMap<i32, Option<i64>>> {
429        let mut tpl = TopicPartitionList::new();
430
431        for partition in partitions {
432            tpl.add_partition_offset(self.topic.as_str(), *partition, Offset::Offset(time))?;
433        }
434
435        let offsets = self
436            .client
437            .offsets_for_times(tpl, self.sync_call_timeout)
438            .await?;
439
440        let mut result = HashMap::with_capacity(partitions.len());
441
442        for elem in offsets.elements_for_topic(self.topic.as_str()) {
443            match elem.offset() {
444                Offset::Offset(offset) => {
445                    // XXX(rc): currently in RW source, `offset` means the last consumed offset, so we need to subtract 1
446                    result.insert(elem.partition(), Some(offset - 1));
447                }
448                _ => {
449                    let (_, high_watermark) = self
450                        .client
451                        .fetch_watermarks(
452                            self.topic.as_str(),
453                            elem.partition(),
454                            self.sync_call_timeout,
455                        )
456                        .await?;
457                    result.insert(elem.partition(), Some(high_watermark));
458                }
459            }
460        }
461
462        Ok(result)
463    }
464
465    #[inline]
466    fn report_high_watermark(&mut self, partition: i32, offset: i64) {
467        let high_watermark_metrics =
468            self.high_watermark_metrics
469                .entry(partition)
470                .or_insert_with(|| {
471                    self.context
472                        .metrics
473                        .high_watermark
474                        .with_guarded_label_values(&[
475                            &self.context.info.source_id.to_string(),
476                            &partition.to_string(),
477                        ])
478                });
479        high_watermark_metrics.set(offset);
480    }
481
482    pub async fn check_reachability(&self) -> bool {
483        self.client
484            .fetch_metadata(Some(self.topic.as_str()), self.sync_call_timeout)
485            .await
486            .is_ok()
487    }
488
489    async fn fetch_topic_partition(&self) -> ConnectorResult<Vec<i32>> {
490        // for now, we only support one topic
491        let metadata = self
492            .client
493            .fetch_metadata(Some(self.topic.as_str()), self.sync_call_timeout)
494            .await?;
495
496        let topic_meta = match metadata.topics() {
497            [meta] => meta,
498            _ => bail!("topic {} not found", self.topic),
499        };
500
501        if topic_meta.partitions().is_empty() {
502            bail!("topic {} not found", self.topic);
503        }
504
505        Ok(topic_meta
506            .partitions()
507            .iter()
508            .map(|partition| partition.id())
509            .collect())
510    }
511}