1use std::collections::HashMap;
16use std::sync::{Arc, LazyLock, Weak};
17use std::time::Duration;
18
19use anyhow::{Context, anyhow};
20use async_trait::async_trait;
21use moka::future::Cache as MokaCache;
22use moka::ops::compute::Op;
23use rdkafka::admin::{AdminClient, AdminOptions};
24use rdkafka::consumer::BaseConsumer;
25#[cfg(not(madsim))]
26use rdkafka::consumer::Consumer;
27use rdkafka::error::{KafkaError, KafkaResult};
28use rdkafka::types::RDKafkaErrorCode;
29use rdkafka::{ClientConfig, Offset, TopicPartitionList};
30use risingwave_common::bail;
31use risingwave_common::id::FragmentId;
32use risingwave_common::metrics::LabelGuardedIntGauge;
33use thiserror_ext::AsReport;
34
35use crate::connector_common::read_kafka_log_level;
36use crate::error::{ConnectorError, ConnectorResult};
37use crate::source::SourceEnumeratorContextRef;
38use crate::source::base::SplitEnumerator;
39use crate::source::kafka::split::KafkaSplit;
40use crate::source::kafka::{
41 KAFKA_ISOLATION_LEVEL, KafkaConnectionProps, KafkaContextCommon, KafkaProperties,
42 RwConsumerContext,
43};
44
45type KafkaConsumer = BaseConsumer<RwConsumerContext>;
46type KafkaAdmin = AdminClient<RwConsumerContext>;
47
48pub static SHARED_KAFKA_CONSUMER: LazyLock<MokaCache<KafkaConnectionProps, Weak<KafkaConsumer>>> =
50 LazyLock::new(|| moka::future::Cache::builder().build());
51pub static SHARED_KAFKA_ADMIN: LazyLock<MokaCache<KafkaConnectionProps, Arc<KafkaAdmin>>> =
53 LazyLock::new(|| {
54 moka::future::Cache::builder()
55 .time_to_idle(Duration::from_secs(5 * 60))
56 .build()
57 });
58
59#[derive(Debug, Copy, Clone, Eq, PartialEq)]
60pub enum KafkaEnumeratorOffset {
61 Earliest,
62 Latest,
63 Timestamp(i64),
64 None,
65}
66
67pub struct KafkaSplitEnumerator {
68 context: SourceEnumeratorContextRef,
69 broker_address: String,
70 topic: String,
71 client: Arc<KafkaConsumer>,
72 start_offset: KafkaEnumeratorOffset,
73
74 stop_offset: KafkaEnumeratorOffset,
76
77 sync_call_timeout: Duration,
78 high_watermark_metrics: HashMap<i32, LabelGuardedIntGauge>,
79
80 properties: KafkaProperties,
81 config: rdkafka::ClientConfig,
82}
83
84impl KafkaSplitEnumerator {
85 async fn drop_consumer_groups(&self, fragment_ids: Vec<FragmentId>) -> ConnectorResult<()> {
86 let admin = Box::pin(SHARED_KAFKA_ADMIN.try_get_with_by_ref(
87 &self.properties.connection,
88 async {
89 tracing::info!("build new kafka admin for {}", self.broker_address);
90 Ok(Arc::new(
91 build_kafka_admin(&self.config, &self.properties).await?,
92 ))
93 },
94 ))
95 .await?;
96
97 let group_ids = fragment_ids
98 .iter()
99 .map(|fragment_id| self.properties.group_id(*fragment_id))
100 .collect::<Vec<_>>();
101 let group_ids: Vec<&str> = group_ids.iter().map(|s| s.as_str()).collect();
102 let res = admin
103 .delete_groups(&group_ids, &AdminOptions::default())
104 .await?;
105 tracing::debug!(
106 topic = self.topic,
107 ?fragment_ids,
108 "delete groups result: {res:?}"
109 );
110 Ok(())
111 }
112}
113
114#[async_trait]
115impl SplitEnumerator for KafkaSplitEnumerator {
116 type Properties = KafkaProperties;
117 type Split = KafkaSplit;
118
119 async fn new(
120 properties: KafkaProperties,
121 context: SourceEnumeratorContextRef,
122 ) -> ConnectorResult<KafkaSplitEnumerator> {
123 let mut config = rdkafka::ClientConfig::new();
124 let common_props = &properties.common;
125
126 let broker_address = properties.connection.brokers.clone();
127 let topic = common_props.topic.clone();
128 config.set("bootstrap.servers", &broker_address);
129 config.set("isolation.level", KAFKA_ISOLATION_LEVEL);
130 if let Some(log_level) = read_kafka_log_level() {
131 config.set_log_level(log_level);
132 }
133 properties.connection.set_security_properties(&mut config);
134 properties.set_client(&mut config);
135 config.set("statistics.interval.ms", "0");
139 let mut scan_start_offset = match properties
140 .scan_startup_mode
141 .as_ref()
142 .map(|s| s.to_lowercase())
143 .as_deref()
144 {
145 Some("earliest") => KafkaEnumeratorOffset::Earliest,
146 Some("latest") => KafkaEnumeratorOffset::Latest,
147 None => KafkaEnumeratorOffset::Earliest,
148 _ => bail!(
149 "properties `scan_startup_mode` only supports earliest and latest or leaving it empty"
150 ),
151 };
152
153 if let Some(s) = &properties.time_offset {
154 let time_offset = s.parse::<i64>().map_err(|e| anyhow!(e))?;
155 scan_start_offset = KafkaEnumeratorOffset::Timestamp(time_offset)
156 }
157
158 let mut client: Option<Arc<KafkaConsumer>> = None;
159 SHARED_KAFKA_CONSUMER
160 .entry_by_ref(&properties.connection)
161 .and_try_compute_with::<_, _, ConnectorError>(|maybe_entry| async {
162 if let Some(entry) = maybe_entry {
163 let entry_value = entry.into_value();
164 if let Some(client_) = entry_value.upgrade() {
165 tracing::info!("reuse existing kafka client for {}", broker_address);
167 client = Some(client_);
168 return Ok(Op::Nop);
169 }
170 }
171 tracing::info!("build new kafka client for {}", broker_address);
172 client = Some(build_kafka_client(&config, &properties).await?);
173 Ok(Op::Put(Arc::downgrade(client.as_ref().unwrap())))
174 })
175 .await?;
176
177 Ok(Self {
178 context,
179 broker_address,
180 topic,
181 client: client.unwrap(),
182 start_offset: scan_start_offset,
183 stop_offset: KafkaEnumeratorOffset::None,
184 sync_call_timeout: properties.common.sync_call_timeout,
185 high_watermark_metrics: HashMap::new(),
186 properties,
187 config,
188 })
189 }
190
191 async fn list_splits(&mut self) -> ConnectorResult<Vec<KafkaSplit>> {
192 if let Some(Err(poll_err)) = {
201 #[cfg(not(madsim))]
202 {
203 self.client.poll(Duration::ZERO)
204 }
205 #[cfg(madsim)]
206 {
207 self.client.poll(Duration::ZERO).await
208 }
209 } && !is_expected_no_group_poll_error(&poll_err)
210 {
211 tracing::warn!(
212 error = %poll_err.as_report(),
213 topic = self.topic,
214 broker_address = self.broker_address,
215 "failed to poll kafka client");
216 }
217
218 let topic_partitions = self.fetch_topic_partition().await.with_context(|| {
219 format!(
220 "failed to fetch metadata from kafka ({})",
221 self.broker_address
222 )
223 })?;
224
225 let watermarks = self.get_watermarks(topic_partitions.as_ref()).await?;
226 let mut start_offsets = self
227 .fetch_start_offset(topic_partitions.as_ref(), &watermarks)
228 .await?;
229
230 let mut stop_offsets = self
231 .fetch_stop_offset(topic_partitions.as_ref(), &watermarks)
232 .await?;
233
234 let ret: Vec<_> = topic_partitions
235 .into_iter()
236 .map(|partition| KafkaSplit {
237 topic: self.topic.clone(),
238 partition,
239 start_offset: start_offsets.remove(&partition).unwrap(),
240 stop_offset: stop_offsets.remove(&partition).unwrap(),
241 })
242 .collect();
243
244 Ok(ret)
245 }
246
247 async fn on_drop_fragments(&mut self, fragment_ids: Vec<FragmentId>) -> ConnectorResult<()> {
248 self.drop_consumer_groups(fragment_ids).await
249 }
250
251 async fn on_finish_backfill(&mut self, fragment_ids: Vec<FragmentId>) -> ConnectorResult<()> {
252 self.drop_consumer_groups(fragment_ids).await
253 }
254}
255
256fn is_expected_no_group_poll_error(error: &KafkaError) -> bool {
257 matches!(
258 error,
259 KafkaError::MessageConsumption(RDKafkaErrorCode::UnknownGroup)
260 )
261}
262
263async fn build_kafka_client(
264 config: &ClientConfig,
265 properties: &KafkaProperties,
266) -> ConnectorResult<Arc<KafkaConsumer>> {
267 let ctx_common = KafkaContextCommon::new(
268 properties.privatelink_common.broker_rewrite_map.clone(),
269 None,
270 None,
271 properties.aws_auth_props.clone(),
272 properties.connection.is_aws_msk_iam(),
273 )
274 .await?;
275 let client_ctx = RwConsumerContext::new(ctx_common);
276 let client: KafkaConsumer = config.create_with_context(client_ctx).await?;
277
278 if properties.connection.is_aws_msk_iam() {
284 #[cfg(not(madsim))]
285 client.poll(Duration::from_secs(10)); #[cfg(madsim)]
287 client.poll(Duration::from_secs(10)).await;
288 }
289 Ok(Arc::new(client))
290}
291async fn build_kafka_admin(
292 config: &ClientConfig,
293 properties: &KafkaProperties,
294) -> ConnectorResult<KafkaAdmin> {
295 let ctx_common = KafkaContextCommon::new(
296 properties.privatelink_common.broker_rewrite_map.clone(),
297 None,
298 None,
299 properties.aws_auth_props.clone(),
300 properties.connection.is_aws_msk_iam(),
301 )
302 .await?;
303 let client_ctx = RwConsumerContext::new(ctx_common);
304 let client: KafkaAdmin = config.create_with_context(client_ctx).await?;
305 Ok(client)
307}
308
309impl KafkaSplitEnumerator {
310 async fn get_watermarks(
311 &mut self,
312 partitions: &[i32],
313 ) -> KafkaResult<HashMap<i32, (i64, i64)>> {
314 let mut map = HashMap::new();
315 for partition in partitions {
316 let (low, high) = self
317 .client
318 .fetch_watermarks(self.topic.as_str(), *partition, self.sync_call_timeout)
319 .await?;
320 self.report_high_watermark(*partition, high);
321 map.insert(*partition, (low, high));
322 }
323 tracing::debug!("fetch kafka watermarks: {map:?}");
324 Ok(map)
325 }
326
327 pub async fn list_splits_batch(
328 &mut self,
329 expect_start_timestamp_millis: Option<i64>,
330 expect_stop_timestamp_millis: Option<i64>,
331 ) -> ConnectorResult<Vec<KafkaSplit>> {
332 let topic_partitions = self.fetch_topic_partition().await.with_context(|| {
333 format!(
334 "failed to fetch metadata from kafka ({})",
335 self.broker_address
336 )
337 })?;
338
339 let mut watermarks = self.get_watermarks(topic_partitions.as_ref()).await?;
342
343 let mut expect_start_offset = if let Some(ts) = expect_start_timestamp_millis {
348 Some(
349 self.fetch_offset_for_time(topic_partitions.as_ref(), ts, &watermarks)
350 .await?,
351 )
352 } else {
353 None
354 };
355
356 let mut expect_stop_offset = if let Some(ts) = expect_stop_timestamp_millis {
357 Some(
358 self.fetch_offset_for_time(topic_partitions.as_ref(), ts, &watermarks)
359 .await?,
360 )
361 } else {
362 None
363 };
364
365 Ok(topic_partitions
366 .iter()
367 .map(|partition| {
368 let (low, high) = watermarks.remove(partition).unwrap();
369 let start_offset = {
370 let earliest_offset = low - 1;
371 let start = expect_start_offset
372 .as_mut()
373 .map(|m| m.remove(partition).flatten().unwrap_or(earliest_offset))
374 .unwrap_or(earliest_offset);
375 i64::max(start, earliest_offset)
376 };
377 let stop_offset = {
378 let stop = expect_stop_offset
379 .as_mut()
380 .map(|m| m.remove(partition).unwrap_or(Some(high)))
381 .unwrap_or(Some(high))
382 .unwrap_or(high);
383 i64::min(stop, high)
384 };
385
386 if start_offset > stop_offset {
387 tracing::warn!(
388 "Skipping topic {} partition {}: requested start offset {} is greater than stop offset {}",
389 self.topic,
390 partition,
391 start_offset,
392 stop_offset
393 );
394 }
395 KafkaSplit {
396 topic: self.topic.clone(),
397 partition: *partition,
398 start_offset: Some(start_offset),
399 stop_offset: Some(stop_offset),
400 }
401 })
402 .collect::<Vec<KafkaSplit>>())
403 }
404
405 async fn fetch_stop_offset(
406 &self,
407 partitions: &[i32],
408 watermarks: &HashMap<i32, (i64, i64)>,
409 ) -> KafkaResult<HashMap<i32, Option<i64>>> {
410 match self.stop_offset {
411 KafkaEnumeratorOffset::Earliest => unreachable!(),
412 KafkaEnumeratorOffset::Latest => {
413 let mut map = HashMap::new();
414 for partition in partitions {
415 let (_, high_watermark) = watermarks.get(partition).unwrap();
416 map.insert(*partition, Some(*high_watermark));
417 }
418 Ok(map)
419 }
420 KafkaEnumeratorOffset::Timestamp(time) => {
421 self.fetch_offset_for_time(partitions, time, watermarks)
422 .await
423 }
424 KafkaEnumeratorOffset::None => partitions
425 .iter()
426 .map(|partition| Ok((*partition, None)))
427 .collect(),
428 }
429 }
430
431 async fn fetch_start_offset(
432 &self,
433 partitions: &[i32],
434 watermarks: &HashMap<i32, (i64, i64)>,
435 ) -> KafkaResult<HashMap<i32, Option<i64>>> {
436 match self.start_offset {
437 KafkaEnumeratorOffset::Earliest | KafkaEnumeratorOffset::Latest => {
438 let mut map = HashMap::new();
439 for partition in partitions {
440 let (low_watermark, high_watermark) = watermarks.get(partition).unwrap();
441 let offset = match self.start_offset {
442 KafkaEnumeratorOffset::Earliest => low_watermark - 1,
443 KafkaEnumeratorOffset::Latest => high_watermark - 1,
444 _ => unreachable!(),
445 };
446 map.insert(*partition, Some(offset));
447 }
448 Ok(map)
449 }
450 KafkaEnumeratorOffset::Timestamp(time) => {
451 self.fetch_offset_for_time(partitions, time, watermarks)
452 .await
453 }
454 KafkaEnumeratorOffset::None => partitions
455 .iter()
456 .map(|partition| Ok((*partition, None)))
457 .collect(),
458 }
459 }
460
461 async fn fetch_offset_for_time(
462 &self,
463 partitions: &[i32],
464 time: i64,
465 watermarks: &HashMap<i32, (i64, i64)>,
466 ) -> KafkaResult<HashMap<i32, Option<i64>>> {
467 let mut tpl = TopicPartitionList::new();
468
469 for partition in partitions {
470 tpl.add_partition_offset(self.topic.as_str(), *partition, Offset::Offset(time))?;
471 }
472
473 let offsets = self
474 .client
475 .offsets_for_times(tpl, self.sync_call_timeout)
476 .await?;
477
478 let mut result = HashMap::with_capacity(partitions.len());
479
480 for elem in offsets.elements_for_topic(self.topic.as_str()) {
481 match elem.offset() {
482 Offset::Offset(offset) => {
483 result.insert(elem.partition(), Some(offset - 1));
485 }
486 Offset::End => {
487 let (_, high_watermark) = watermarks.get(&elem.partition()).unwrap();
488 tracing::info!(
489 source_id = %self.context.info.source_id,
490 "no message found before timestamp {} (ms) for partition {}, start from latest",
491 time,
492 elem.partition()
493 );
494 result.insert(elem.partition(), Some(high_watermark - 1)); }
496 Offset::Invalid => {
497 tracing::info!(
502 source_id = %self.context.info.source_id,
503 "got invalid offset for partition {} at timestamp {}, align to latest",
504 elem.partition(),
505 time
506 );
507 let (_, high_watermark) = watermarks.get(&elem.partition()).unwrap();
508 result.insert(elem.partition(), Some(high_watermark - 1)); }
510 Offset::Beginning => {
511 let (low, _) = watermarks.get(&elem.partition()).unwrap();
512 tracing::info!(
513 source_id = %self.context.info.source_id,
514 "all message in partition {} is after timestamp {} (ms), start from earliest",
515 elem.partition(),
516 time,
517 );
518 result.insert(elem.partition(), Some(low - 1)); }
520 err_offset @ Offset::Stored | err_offset @ Offset::OffsetTail(_) => {
521 tracing::error!(
522 source_id = %self.context.info.source_id,
523 "got invalid offset for partition {}: {err_offset:?}",
524 elem.partition(),
525 err_offset = err_offset,
526 );
527 return Err(KafkaError::OffsetFetch(RDKafkaErrorCode::NoOffset));
528 }
529 }
530 }
531
532 Ok(result)
533 }
534
535 #[inline]
536 fn report_high_watermark(&mut self, partition: i32, offset: i64) {
537 let high_watermark_metrics =
538 self.high_watermark_metrics
539 .entry(partition)
540 .or_insert_with(|| {
541 self.context
542 .metrics
543 .high_watermark
544 .with_guarded_label_values(&[
545 &self.context.info.source_id.to_string(),
546 &partition.to_string(),
547 ])
548 });
549 high_watermark_metrics.set(offset);
550 }
551
552 pub async fn check_reachability(&self) -> ConnectorResult<()> {
553 let _ = self
554 .client
555 .fetch_metadata(Some(self.topic.as_str()), self.sync_call_timeout)
556 .await?;
557 Ok(())
558 }
559
560 async fn fetch_topic_partition(&self) -> ConnectorResult<Vec<i32>> {
561 let metadata = self
563 .client
564 .fetch_metadata(Some(self.topic.as_str()), self.sync_call_timeout)
565 .await?;
566
567 let topic_meta = match metadata.topics() {
568 [meta] => meta,
569 _ => bail!("topic {} not found", self.topic),
570 };
571
572 if topic_meta.partitions().is_empty() {
573 bail!("topic {} not found", self.topic);
574 }
575
576 Ok(topic_meta
577 .partitions()
578 .iter()
579 .map(|partition| partition.id())
580 .collect())
581 }
582}