1use std::collections::HashMap;
16use std::sync::{Arc, LazyLock, Weak};
17use std::time::Duration;
18
19use anyhow::{Context, anyhow};
20use async_trait::async_trait;
21use moka::future::Cache as MokaCache;
22use moka::ops::compute::Op;
23use rdkafka::admin::{AdminClient, AdminOptions};
24use rdkafka::consumer::{BaseConsumer, Consumer};
25use rdkafka::error::KafkaResult;
26use rdkafka::{ClientConfig, Offset, TopicPartitionList};
27use risingwave_common::bail;
28use risingwave_common::metrics::LabelGuardedIntGauge;
29
30use crate::error::{ConnectorError, ConnectorResult};
31use crate::source::SourceEnumeratorContextRef;
32use crate::source::base::SplitEnumerator;
33use crate::source::kafka::split::KafkaSplit;
34use crate::source::kafka::{
35 KAFKA_ISOLATION_LEVEL, KafkaConnectionProps, KafkaContextCommon, KafkaProperties,
36 RwConsumerContext,
37};
38
39type KafkaConsumer = BaseConsumer<RwConsumerContext>;
40type KafkaAdmin = AdminClient<RwConsumerContext>;
41
42pub static SHARED_KAFKA_CONSUMER: LazyLock<MokaCache<KafkaConnectionProps, Weak<KafkaConsumer>>> =
44 LazyLock::new(|| moka::future::Cache::builder().build());
45pub static SHARED_KAFKA_ADMIN: LazyLock<MokaCache<KafkaConnectionProps, Arc<KafkaAdmin>>> =
47 LazyLock::new(|| {
48 moka::future::Cache::builder()
49 .time_to_idle(Duration::from_secs(5 * 60))
50 .build()
51 });
52
53#[derive(Debug, Copy, Clone, Eq, PartialEq)]
54pub enum KafkaEnumeratorOffset {
55 Earliest,
56 Latest,
57 Timestamp(i64),
58 None,
59}
60
61pub struct KafkaSplitEnumerator {
62 context: SourceEnumeratorContextRef,
63 broker_address: String,
64 topic: String,
65 client: Arc<KafkaConsumer>,
66 start_offset: KafkaEnumeratorOffset,
67
68 stop_offset: KafkaEnumeratorOffset,
70
71 sync_call_timeout: Duration,
72 high_watermark_metrics: HashMap<i32, LabelGuardedIntGauge<2>>,
73
74 properties: KafkaProperties,
75 config: rdkafka::ClientConfig,
76}
77
78impl KafkaSplitEnumerator {
79 async fn drop_consumer_groups(&self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
80 let admin = SHARED_KAFKA_ADMIN
81 .try_get_with_by_ref(&self.properties.connection, async {
82 tracing::info!("build new kafka admin for {}", self.broker_address);
83 Ok(Arc::new(
84 build_kafka_admin(&self.config, &self.properties).await?,
85 ))
86 })
87 .await?;
88
89 let group_ids = fragment_ids
90 .iter()
91 .map(|fragment_id| self.properties.group_id(*fragment_id))
92 .collect::<Vec<_>>();
93 let group_ids: Vec<&str> = group_ids.iter().map(|s| s.as_str()).collect();
94 let res = admin
95 .delete_groups(&group_ids, &AdminOptions::default())
96 .await?;
97 tracing::debug!(
98 topic = self.topic,
99 ?fragment_ids,
100 "delete groups result: {res:?}"
101 );
102 Ok(())
103 }
104}
105
106#[async_trait]
107impl SplitEnumerator for KafkaSplitEnumerator {
108 type Properties = KafkaProperties;
109 type Split = KafkaSplit;
110
111 async fn new(
112 properties: KafkaProperties,
113 context: SourceEnumeratorContextRef,
114 ) -> ConnectorResult<KafkaSplitEnumerator> {
115 let mut config = rdkafka::ClientConfig::new();
116 let common_props = &properties.common;
117
118 let broker_address = properties.connection.brokers.clone();
119 let topic = common_props.topic.clone();
120 config.set("bootstrap.servers", &broker_address);
121 config.set("isolation.level", KAFKA_ISOLATION_LEVEL);
122 properties.connection.set_security_properties(&mut config);
123 properties.set_client(&mut config);
124 let mut scan_start_offset = match properties
125 .scan_startup_mode
126 .as_ref()
127 .map(|s| s.to_lowercase())
128 .as_deref()
129 {
130 Some("earliest") => KafkaEnumeratorOffset::Earliest,
131 Some("latest") => KafkaEnumeratorOffset::Latest,
132 None => KafkaEnumeratorOffset::Earliest,
133 _ => bail!(
134 "properties `scan_startup_mode` only supports earliest and latest or leaving it empty"
135 ),
136 };
137
138 if let Some(s) = &properties.time_offset {
139 let time_offset = s.parse::<i64>().map_err(|e| anyhow!(e))?;
140 scan_start_offset = KafkaEnumeratorOffset::Timestamp(time_offset)
141 }
142
143 let mut client: Option<Arc<KafkaConsumer>> = None;
144 SHARED_KAFKA_CONSUMER
145 .entry_by_ref(&properties.connection)
146 .and_try_compute_with::<_, _, ConnectorError>(|maybe_entry| async {
147 if let Some(entry) = maybe_entry {
148 let entry_value = entry.into_value();
149 if let Some(client_) = entry_value.upgrade() {
150 tracing::info!("reuse existing kafka client for {}", broker_address);
152 client = Some(client_);
153 return Ok(Op::Nop);
154 }
155 }
156 tracing::info!("build new kafka client for {}", broker_address);
157 client = Some(build_kafka_client(&config, &properties).await?);
158 Ok(Op::Put(Arc::downgrade(client.as_ref().unwrap())))
159 })
160 .await?;
161
162 Ok(Self {
163 context,
164 broker_address,
165 topic,
166 client: client.unwrap(),
167 start_offset: scan_start_offset,
168 stop_offset: KafkaEnumeratorOffset::None,
169 sync_call_timeout: properties.common.sync_call_timeout,
170 high_watermark_metrics: HashMap::new(),
171 properties,
172 config,
173 })
174 }
175
176 async fn list_splits(&mut self) -> ConnectorResult<Vec<KafkaSplit>> {
177 let topic_partitions = self.fetch_topic_partition().await.with_context(|| {
178 format!(
179 "failed to fetch metadata from kafka ({})",
180 self.broker_address
181 )
182 })?;
183
184 let watermarks = self.get_watermarks(topic_partitions.as_ref()).await?;
185 let mut start_offsets = self
186 .fetch_start_offset(topic_partitions.as_ref(), &watermarks)
187 .await?;
188
189 let mut stop_offsets = self
190 .fetch_stop_offset(topic_partitions.as_ref(), &watermarks)
191 .await?;
192
193 let ret: Vec<_> = topic_partitions
194 .into_iter()
195 .map(|partition| KafkaSplit {
196 topic: self.topic.clone(),
197 partition,
198 start_offset: start_offsets.remove(&partition).unwrap(),
199 stop_offset: stop_offsets.remove(&partition).unwrap(),
200 })
201 .collect();
202
203 Ok(ret)
204 }
205
206 async fn on_drop_fragments(&mut self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
207 self.drop_consumer_groups(fragment_ids).await
208 }
209
210 async fn on_finish_backfill(&mut self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
211 self.drop_consumer_groups(fragment_ids).await
212 }
213}
214
215async fn build_kafka_client(
216 config: &ClientConfig,
217 properties: &KafkaProperties,
218) -> ConnectorResult<Arc<KafkaConsumer>> {
219 let ctx_common = KafkaContextCommon::new(
220 properties.privatelink_common.broker_rewrite_map.clone(),
221 None,
222 None,
223 properties.aws_auth_props.clone(),
224 properties.connection.is_aws_msk_iam(),
225 )
226 .await?;
227 let client_ctx = RwConsumerContext::new(ctx_common);
228 let client: KafkaConsumer = config.create_with_context(client_ctx).await?;
229
230 if properties.connection.is_aws_msk_iam() {
236 #[cfg(not(madsim))]
237 client.poll(Duration::from_secs(10)); #[cfg(madsim)]
239 client.poll(Duration::from_secs(10)).await;
240 }
241 Ok(Arc::new(client))
242}
243async fn build_kafka_admin(
244 config: &ClientConfig,
245 properties: &KafkaProperties,
246) -> ConnectorResult<KafkaAdmin> {
247 let ctx_common = KafkaContextCommon::new(
248 properties.privatelink_common.broker_rewrite_map.clone(),
249 None,
250 None,
251 properties.aws_auth_props.clone(),
252 properties.connection.is_aws_msk_iam(),
253 )
254 .await?;
255 let client_ctx = RwConsumerContext::new(ctx_common);
256 let client: KafkaAdmin = config.create_with_context(client_ctx).await?;
257 Ok(client)
259}
260
261impl KafkaSplitEnumerator {
262 async fn get_watermarks(
263 &mut self,
264 partitions: &[i32],
265 ) -> KafkaResult<HashMap<i32, (i64, i64)>> {
266 let mut map = HashMap::new();
267 for partition in partitions {
268 let (low, high) = self
269 .client
270 .fetch_watermarks(self.topic.as_str(), *partition, self.sync_call_timeout)
271 .await?;
272 self.report_high_watermark(*partition, high);
273 map.insert(*partition, (low, high));
274 }
275 tracing::debug!("fetch kafka watermarks: {map:?}");
276 Ok(map)
277 }
278
279 pub async fn list_splits_batch(
280 &mut self,
281 expect_start_timestamp_millis: Option<i64>,
282 expect_stop_timestamp_millis: Option<i64>,
283 ) -> ConnectorResult<Vec<KafkaSplit>> {
284 let topic_partitions = self.fetch_topic_partition().await.with_context(|| {
285 format!(
286 "failed to fetch metadata from kafka ({})",
287 self.broker_address
288 )
289 })?;
290
291 let mut expect_start_offset = if let Some(ts) = expect_start_timestamp_millis {
296 Some(
297 self.fetch_offset_for_time(topic_partitions.as_ref(), ts)
298 .await?,
299 )
300 } else {
301 None
302 };
303
304 let mut expect_stop_offset = if let Some(ts) = expect_stop_timestamp_millis {
305 Some(
306 self.fetch_offset_for_time(topic_partitions.as_ref(), ts)
307 .await?,
308 )
309 } else {
310 None
311 };
312
313 let mut watermarks = {
316 let mut ret = HashMap::new();
317 for partition in &topic_partitions {
318 let (low, high) = self
319 .client
320 .fetch_watermarks(self.topic.as_str(), *partition, self.sync_call_timeout)
321 .await?;
322 ret.insert(partition, (low - 1, high));
323 }
324 ret
325 };
326
327 Ok(topic_partitions
328 .iter()
329 .map(|partition| {
330 let (low, high) = watermarks.remove(&partition).unwrap();
331 let start_offset = {
332 let start = expect_start_offset
333 .as_mut()
334 .map(|m| m.remove(partition).flatten().map(|t| t-1).unwrap_or(low))
335 .unwrap_or(low);
336 i64::max(start, low)
337 };
338 let stop_offset = {
339 let stop = expect_stop_offset
340 .as_mut()
341 .map(|m| m.remove(partition).unwrap_or(Some(high)))
342 .unwrap_or(Some(high))
343 .unwrap_or(high);
344 i64::min(stop, high)
345 };
346
347 if start_offset > stop_offset {
348 tracing::warn!(
349 "Skipping topic {} partition {}: requested start offset {} is greater than stop offset {}",
350 self.topic,
351 partition,
352 start_offset,
353 stop_offset
354 );
355 }
356 KafkaSplit {
357 topic: self.topic.clone(),
358 partition: *partition,
359 start_offset: Some(start_offset),
360 stop_offset: Some(stop_offset),
361 }
362 })
363 .collect::<Vec<KafkaSplit>>())
364 }
365
366 async fn fetch_stop_offset(
367 &self,
368 partitions: &[i32],
369 watermarks: &HashMap<i32, (i64, i64)>,
370 ) -> KafkaResult<HashMap<i32, Option<i64>>> {
371 match self.stop_offset {
372 KafkaEnumeratorOffset::Earliest => unreachable!(),
373 KafkaEnumeratorOffset::Latest => {
374 let mut map = HashMap::new();
375 for partition in partitions {
376 let (_, high_watermark) = watermarks.get(partition).unwrap();
377 map.insert(*partition, Some(*high_watermark));
378 }
379 Ok(map)
380 }
381 KafkaEnumeratorOffset::Timestamp(time) => {
382 self.fetch_offset_for_time(partitions, time).await
383 }
384 KafkaEnumeratorOffset::None => partitions
385 .iter()
386 .map(|partition| Ok((*partition, None)))
387 .collect(),
388 }
389 }
390
391 async fn fetch_start_offset(
392 &self,
393 partitions: &[i32],
394 watermarks: &HashMap<i32, (i64, i64)>,
395 ) -> KafkaResult<HashMap<i32, Option<i64>>> {
396 match self.start_offset {
397 KafkaEnumeratorOffset::Earliest | KafkaEnumeratorOffset::Latest => {
398 let mut map = HashMap::new();
399 for partition in partitions {
400 let (low_watermark, high_watermark) = watermarks.get(partition).unwrap();
401 let offset = match self.start_offset {
402 KafkaEnumeratorOffset::Earliest => low_watermark - 1,
403 KafkaEnumeratorOffset::Latest => high_watermark - 1,
404 _ => unreachable!(),
405 };
406 map.insert(*partition, Some(offset));
407 }
408 Ok(map)
409 }
410 KafkaEnumeratorOffset::Timestamp(time) => {
411 self.fetch_offset_for_time(partitions, time).await
412 }
413 KafkaEnumeratorOffset::None => partitions
414 .iter()
415 .map(|partition| Ok((*partition, None)))
416 .collect(),
417 }
418 }
419
420 async fn fetch_offset_for_time(
421 &self,
422 partitions: &[i32],
423 time: i64,
424 ) -> KafkaResult<HashMap<i32, Option<i64>>> {
425 let mut tpl = TopicPartitionList::new();
426
427 for partition in partitions {
428 tpl.add_partition_offset(self.topic.as_str(), *partition, Offset::Offset(time))?;
429 }
430
431 let offsets = self
432 .client
433 .offsets_for_times(tpl, self.sync_call_timeout)
434 .await?;
435
436 let mut result = HashMap::with_capacity(partitions.len());
437
438 for elem in offsets.elements_for_topic(self.topic.as_str()) {
439 match elem.offset() {
440 Offset::Offset(offset) => {
441 result.insert(elem.partition(), Some(offset - 1));
443 }
444 _ => {
445 let (_, high_watermark) = self
446 .client
447 .fetch_watermarks(
448 self.topic.as_str(),
449 elem.partition(),
450 self.sync_call_timeout,
451 )
452 .await?;
453 result.insert(elem.partition(), Some(high_watermark));
454 }
455 }
456 }
457
458 Ok(result)
459 }
460
461 #[inline]
462 fn report_high_watermark(&mut self, partition: i32, offset: i64) {
463 let high_watermark_metrics =
464 self.high_watermark_metrics
465 .entry(partition)
466 .or_insert_with(|| {
467 self.context
468 .metrics
469 .high_watermark
470 .with_guarded_label_values(&[
471 &self.context.info.source_id.to_string(),
472 &partition.to_string(),
473 ])
474 });
475 high_watermark_metrics.set(offset);
476 }
477
478 pub async fn check_reachability(&self) -> bool {
479 self.client
480 .fetch_metadata(Some(self.topic.as_str()), self.sync_call_timeout)
481 .await
482 .is_ok()
483 }
484
485 async fn fetch_topic_partition(&self) -> ConnectorResult<Vec<i32>> {
486 let metadata = self
488 .client
489 .fetch_metadata(Some(self.topic.as_str()), self.sync_call_timeout)
490 .await?;
491
492 let topic_meta = match metadata.topics() {
493 [meta] => meta,
494 _ => bail!("topic {} not found", self.topic),
495 };
496
497 if topic_meta.partitions().is_empty() {
498 bail!("topic {} not found", self.topic);
499 }
500
501 Ok(topic_meta
502 .partitions()
503 .iter()
504 .map(|partition| partition.id())
505 .collect())
506 }
507}