1use std::collections::HashMap;
16use std::sync::{Arc, LazyLock, Weak};
17use std::time::Duration;
18
19use anyhow::{Context, anyhow};
20use async_trait::async_trait;
21use moka::future::Cache as MokaCache;
22use moka::ops::compute::Op;
23use rdkafka::admin::{AdminClient, AdminOptions};
24use rdkafka::consumer::{BaseConsumer, Consumer};
25use rdkafka::error::{KafkaError, KafkaResult};
26use rdkafka::types::RDKafkaErrorCode;
27use rdkafka::{ClientConfig, Offset, TopicPartitionList};
28use risingwave_common::bail;
29use risingwave_common::id::FragmentId;
30use risingwave_common::metrics::LabelGuardedIntGauge;
31use thiserror_ext::AsReport;
32
33use crate::connector_common::read_kafka_log_level;
34use crate::error::{ConnectorError, ConnectorResult};
35use crate::source::SourceEnumeratorContextRef;
36use crate::source::base::SplitEnumerator;
37use crate::source::kafka::split::KafkaSplit;
38use crate::source::kafka::{
39 KAFKA_ISOLATION_LEVEL, KafkaConnectionProps, KafkaContextCommon, KafkaProperties,
40 RwConsumerContext,
41};
42
43type KafkaConsumer = BaseConsumer<RwConsumerContext>;
44type KafkaAdmin = AdminClient<RwConsumerContext>;
45
46pub static SHARED_KAFKA_CONSUMER: LazyLock<MokaCache<KafkaConnectionProps, Weak<KafkaConsumer>>> =
48 LazyLock::new(|| moka::future::Cache::builder().build());
49pub static SHARED_KAFKA_ADMIN: LazyLock<MokaCache<KafkaConnectionProps, Arc<KafkaAdmin>>> =
51 LazyLock::new(|| {
52 moka::future::Cache::builder()
53 .time_to_idle(Duration::from_secs(5 * 60))
54 .build()
55 });
56
57#[derive(Debug, Copy, Clone, Eq, PartialEq)]
58pub enum KafkaEnumeratorOffset {
59 Earliest,
60 Latest,
61 Timestamp(i64),
62 None,
63}
64
65pub struct KafkaSplitEnumerator {
66 context: SourceEnumeratorContextRef,
67 broker_address: String,
68 topic: String,
69 client: Arc<KafkaConsumer>,
70 start_offset: KafkaEnumeratorOffset,
71
72 stop_offset: KafkaEnumeratorOffset,
74
75 sync_call_timeout: Duration,
76 high_watermark_metrics: HashMap<i32, LabelGuardedIntGauge>,
77
78 properties: KafkaProperties,
79 config: rdkafka::ClientConfig,
80}
81
82impl KafkaSplitEnumerator {
83 async fn drop_consumer_groups(&self, fragment_ids: Vec<FragmentId>) -> ConnectorResult<()> {
84 let admin = Box::pin(SHARED_KAFKA_ADMIN.try_get_with_by_ref(
85 &self.properties.connection,
86 async {
87 tracing::info!("build new kafka admin for {}", self.broker_address);
88 Ok(Arc::new(
89 build_kafka_admin(&self.config, &self.properties).await?,
90 ))
91 },
92 ))
93 .await?;
94
95 let group_ids = fragment_ids
96 .iter()
97 .map(|fragment_id| self.properties.group_id(*fragment_id))
98 .collect::<Vec<_>>();
99 let group_ids: Vec<&str> = group_ids.iter().map(|s| s.as_str()).collect();
100 let res = admin
101 .delete_groups(&group_ids, &AdminOptions::default())
102 .await?;
103 tracing::debug!(
104 topic = self.topic,
105 ?fragment_ids,
106 "delete groups result: {res:?}"
107 );
108 Ok(())
109 }
110}
111
112#[async_trait]
113impl SplitEnumerator for KafkaSplitEnumerator {
114 type Properties = KafkaProperties;
115 type Split = KafkaSplit;
116
117 async fn new(
118 properties: KafkaProperties,
119 context: SourceEnumeratorContextRef,
120 ) -> ConnectorResult<KafkaSplitEnumerator> {
121 let mut config = rdkafka::ClientConfig::new();
122 let common_props = &properties.common;
123
124 let broker_address = properties.connection.brokers.clone();
125 let topic = common_props.topic.clone();
126 config.set("bootstrap.servers", &broker_address);
127 config.set("isolation.level", KAFKA_ISOLATION_LEVEL);
128 if let Some(log_level) = read_kafka_log_level() {
129 config.set_log_level(log_level);
130 }
131 properties.connection.set_security_properties(&mut config);
132 properties.set_client(&mut config);
133 config.set("statistics.interval.ms", "0");
137 let mut scan_start_offset = match properties
138 .scan_startup_mode
139 .as_ref()
140 .map(|s| s.to_lowercase())
141 .as_deref()
142 {
143 Some("earliest") => KafkaEnumeratorOffset::Earliest,
144 Some("latest") => KafkaEnumeratorOffset::Latest,
145 None => KafkaEnumeratorOffset::Earliest,
146 _ => bail!(
147 "properties `scan_startup_mode` only supports earliest and latest or leaving it empty"
148 ),
149 };
150
151 if let Some(s) = &properties.time_offset {
152 let time_offset = s.parse::<i64>().map_err(|e| anyhow!(e))?;
153 scan_start_offset = KafkaEnumeratorOffset::Timestamp(time_offset)
154 }
155
156 let mut client: Option<Arc<KafkaConsumer>> = None;
157 SHARED_KAFKA_CONSUMER
158 .entry_by_ref(&properties.connection)
159 .and_try_compute_with::<_, _, ConnectorError>(|maybe_entry| async {
160 if let Some(entry) = maybe_entry {
161 let entry_value = entry.into_value();
162 if let Some(client_) = entry_value.upgrade() {
163 tracing::info!("reuse existing kafka client for {}", broker_address);
165 client = Some(client_);
166 return Ok(Op::Nop);
167 }
168 }
169 tracing::info!("build new kafka client for {}", broker_address);
170 client = Some(build_kafka_client(&config, &properties).await?);
171 Ok(Op::Put(Arc::downgrade(client.as_ref().unwrap())))
172 })
173 .await?;
174
175 Ok(Self {
176 context,
177 broker_address,
178 topic,
179 client: client.unwrap(),
180 start_offset: scan_start_offset,
181 stop_offset: KafkaEnumeratorOffset::None,
182 sync_call_timeout: properties.common.sync_call_timeout,
183 high_watermark_metrics: HashMap::new(),
184 properties,
185 config,
186 })
187 }
188
189 async fn list_splits(&mut self) -> ConnectorResult<Vec<KafkaSplit>> {
190 if let Some(Err(poll_err)) = {
194 #[cfg(not(madsim))]
195 {
196 self.client.poll(Duration::ZERO)
197 }
198 #[cfg(madsim)]
199 {
200 self.client.poll(Duration::ZERO).await
201 }
202 } {
203 tracing::warn!(
204 error = %poll_err.as_report(),
205 topic = self.topic,
206 broker_address = self.broker_address,
207 "failed to poll kafka client");
208 }
209
210 let topic_partitions = self.fetch_topic_partition().await.with_context(|| {
211 format!(
212 "failed to fetch metadata from kafka ({})",
213 self.broker_address
214 )
215 })?;
216
217 let watermarks = self.get_watermarks(topic_partitions.as_ref()).await?;
218 let mut start_offsets = self
219 .fetch_start_offset(topic_partitions.as_ref(), &watermarks)
220 .await?;
221
222 let mut stop_offsets = self
223 .fetch_stop_offset(topic_partitions.as_ref(), &watermarks)
224 .await?;
225
226 let ret: Vec<_> = topic_partitions
227 .into_iter()
228 .map(|partition| KafkaSplit {
229 topic: self.topic.clone(),
230 partition,
231 start_offset: start_offsets.remove(&partition).unwrap(),
232 stop_offset: stop_offsets.remove(&partition).unwrap(),
233 })
234 .collect();
235
236 Ok(ret)
237 }
238
239 async fn on_drop_fragments(&mut self, fragment_ids: Vec<FragmentId>) -> ConnectorResult<()> {
240 self.drop_consumer_groups(fragment_ids).await
241 }
242
243 async fn on_finish_backfill(&mut self, fragment_ids: Vec<FragmentId>) -> ConnectorResult<()> {
244 self.drop_consumer_groups(fragment_ids).await
245 }
246}
247
248async fn build_kafka_client(
249 config: &ClientConfig,
250 properties: &KafkaProperties,
251) -> ConnectorResult<Arc<KafkaConsumer>> {
252 let ctx_common = KafkaContextCommon::new(
253 properties.privatelink_common.broker_rewrite_map.clone(),
254 None,
255 None,
256 properties.aws_auth_props.clone(),
257 properties.connection.is_aws_msk_iam(),
258 )
259 .await?;
260 let client_ctx = RwConsumerContext::new(ctx_common);
261 let client: KafkaConsumer = config.create_with_context(client_ctx).await?;
262
263 if properties.connection.is_aws_msk_iam() {
269 #[cfg(not(madsim))]
270 client.poll(Duration::from_secs(10)); #[cfg(madsim)]
272 client.poll(Duration::from_secs(10)).await;
273 }
274 Ok(Arc::new(client))
275}
276async fn build_kafka_admin(
277 config: &ClientConfig,
278 properties: &KafkaProperties,
279) -> ConnectorResult<KafkaAdmin> {
280 let ctx_common = KafkaContextCommon::new(
281 properties.privatelink_common.broker_rewrite_map.clone(),
282 None,
283 None,
284 properties.aws_auth_props.clone(),
285 properties.connection.is_aws_msk_iam(),
286 )
287 .await?;
288 let client_ctx = RwConsumerContext::new(ctx_common);
289 let client: KafkaAdmin = config.create_with_context(client_ctx).await?;
290 Ok(client)
292}
293
294impl KafkaSplitEnumerator {
295 async fn get_watermarks(
296 &mut self,
297 partitions: &[i32],
298 ) -> KafkaResult<HashMap<i32, (i64, i64)>> {
299 let mut map = HashMap::new();
300 for partition in partitions {
301 let (low, high) = self
302 .client
303 .fetch_watermarks(self.topic.as_str(), *partition, self.sync_call_timeout)
304 .await?;
305 self.report_high_watermark(*partition, high);
306 map.insert(*partition, (low, high));
307 }
308 tracing::debug!("fetch kafka watermarks: {map:?}");
309 Ok(map)
310 }
311
312 pub async fn list_splits_batch(
313 &mut self,
314 expect_start_timestamp_millis: Option<i64>,
315 expect_stop_timestamp_millis: Option<i64>,
316 ) -> ConnectorResult<Vec<KafkaSplit>> {
317 let topic_partitions = self.fetch_topic_partition().await.with_context(|| {
318 format!(
319 "failed to fetch metadata from kafka ({})",
320 self.broker_address
321 )
322 })?;
323
324 let mut watermarks = self.get_watermarks(topic_partitions.as_ref()).await?;
327
328 let mut expect_start_offset = if let Some(ts) = expect_start_timestamp_millis {
333 Some(
334 self.fetch_offset_for_time(topic_partitions.as_ref(), ts, &watermarks)
335 .await?,
336 )
337 } else {
338 None
339 };
340
341 let mut expect_stop_offset = if let Some(ts) = expect_stop_timestamp_millis {
342 Some(
343 self.fetch_offset_for_time(topic_partitions.as_ref(), ts, &watermarks)
344 .await?,
345 )
346 } else {
347 None
348 };
349
350 Ok(topic_partitions
351 .iter()
352 .map(|partition| {
353 let (low, high) = watermarks.remove(partition).unwrap();
354 let start_offset = {
355 let earliest_offset = low - 1;
356 let start = expect_start_offset
357 .as_mut()
358 .map(|m| m.remove(partition).flatten().unwrap_or(earliest_offset))
359 .unwrap_or(earliest_offset);
360 i64::max(start, earliest_offset)
361 };
362 let stop_offset = {
363 let stop = expect_stop_offset
364 .as_mut()
365 .map(|m| m.remove(partition).unwrap_or(Some(high)))
366 .unwrap_or(Some(high))
367 .unwrap_or(high);
368 i64::min(stop, high)
369 };
370
371 if start_offset > stop_offset {
372 tracing::warn!(
373 "Skipping topic {} partition {}: requested start offset {} is greater than stop offset {}",
374 self.topic,
375 partition,
376 start_offset,
377 stop_offset
378 );
379 }
380 KafkaSplit {
381 topic: self.topic.clone(),
382 partition: *partition,
383 start_offset: Some(start_offset),
384 stop_offset: Some(stop_offset),
385 }
386 })
387 .collect::<Vec<KafkaSplit>>())
388 }
389
390 async fn fetch_stop_offset(
391 &self,
392 partitions: &[i32],
393 watermarks: &HashMap<i32, (i64, i64)>,
394 ) -> KafkaResult<HashMap<i32, Option<i64>>> {
395 match self.stop_offset {
396 KafkaEnumeratorOffset::Earliest => unreachable!(),
397 KafkaEnumeratorOffset::Latest => {
398 let mut map = HashMap::new();
399 for partition in partitions {
400 let (_, high_watermark) = watermarks.get(partition).unwrap();
401 map.insert(*partition, Some(*high_watermark));
402 }
403 Ok(map)
404 }
405 KafkaEnumeratorOffset::Timestamp(time) => {
406 self.fetch_offset_for_time(partitions, time, watermarks)
407 .await
408 }
409 KafkaEnumeratorOffset::None => partitions
410 .iter()
411 .map(|partition| Ok((*partition, None)))
412 .collect(),
413 }
414 }
415
416 async fn fetch_start_offset(
417 &self,
418 partitions: &[i32],
419 watermarks: &HashMap<i32, (i64, i64)>,
420 ) -> KafkaResult<HashMap<i32, Option<i64>>> {
421 match self.start_offset {
422 KafkaEnumeratorOffset::Earliest | KafkaEnumeratorOffset::Latest => {
423 let mut map = HashMap::new();
424 for partition in partitions {
425 let (low_watermark, high_watermark) = watermarks.get(partition).unwrap();
426 let offset = match self.start_offset {
427 KafkaEnumeratorOffset::Earliest => low_watermark - 1,
428 KafkaEnumeratorOffset::Latest => high_watermark - 1,
429 _ => unreachable!(),
430 };
431 map.insert(*partition, Some(offset));
432 }
433 Ok(map)
434 }
435 KafkaEnumeratorOffset::Timestamp(time) => {
436 self.fetch_offset_for_time(partitions, time, watermarks)
437 .await
438 }
439 KafkaEnumeratorOffset::None => partitions
440 .iter()
441 .map(|partition| Ok((*partition, None)))
442 .collect(),
443 }
444 }
445
446 async fn fetch_offset_for_time(
447 &self,
448 partitions: &[i32],
449 time: i64,
450 watermarks: &HashMap<i32, (i64, i64)>,
451 ) -> KafkaResult<HashMap<i32, Option<i64>>> {
452 let mut tpl = TopicPartitionList::new();
453
454 for partition in partitions {
455 tpl.add_partition_offset(self.topic.as_str(), *partition, Offset::Offset(time))?;
456 }
457
458 let offsets = self
459 .client
460 .offsets_for_times(tpl, self.sync_call_timeout)
461 .await?;
462
463 let mut result = HashMap::with_capacity(partitions.len());
464
465 for elem in offsets.elements_for_topic(self.topic.as_str()) {
466 match elem.offset() {
467 Offset::Offset(offset) => {
468 result.insert(elem.partition(), Some(offset - 1));
470 }
471 Offset::End => {
472 let (_, high_watermark) = watermarks.get(&elem.partition()).unwrap();
473 tracing::info!(
474 source_id = %self.context.info.source_id,
475 "no message found before timestamp {} (ms) for partition {}, start from latest",
476 time,
477 elem.partition()
478 );
479 result.insert(elem.partition(), Some(high_watermark - 1)); }
481 Offset::Invalid => {
482 tracing::info!(
487 source_id = %self.context.info.source_id,
488 "got invalid offset for partition {} at timestamp {}, align to latest",
489 elem.partition(),
490 time
491 );
492 let (_, high_watermark) = watermarks.get(&elem.partition()).unwrap();
493 result.insert(elem.partition(), Some(high_watermark - 1)); }
495 Offset::Beginning => {
496 let (low, _) = watermarks.get(&elem.partition()).unwrap();
497 tracing::info!(
498 source_id = %self.context.info.source_id,
499 "all message in partition {} is after timestamp {} (ms), start from earliest",
500 elem.partition(),
501 time,
502 );
503 result.insert(elem.partition(), Some(low - 1)); }
505 err_offset @ Offset::Stored | err_offset @ Offset::OffsetTail(_) => {
506 tracing::error!(
507 source_id = %self.context.info.source_id,
508 "got invalid offset for partition {}: {err_offset:?}",
509 elem.partition(),
510 err_offset = err_offset,
511 );
512 return Err(KafkaError::OffsetFetch(RDKafkaErrorCode::NoOffset));
513 }
514 }
515 }
516
517 Ok(result)
518 }
519
520 #[inline]
521 fn report_high_watermark(&mut self, partition: i32, offset: i64) {
522 let high_watermark_metrics =
523 self.high_watermark_metrics
524 .entry(partition)
525 .or_insert_with(|| {
526 self.context
527 .metrics
528 .high_watermark
529 .with_guarded_label_values(&[
530 &self.context.info.source_id.to_string(),
531 &partition.to_string(),
532 ])
533 });
534 high_watermark_metrics.set(offset);
535 }
536
537 pub async fn check_reachability(&self) -> ConnectorResult<()> {
538 let _ = self
539 .client
540 .fetch_metadata(Some(self.topic.as_str()), self.sync_call_timeout)
541 .await?;
542 Ok(())
543 }
544
545 async fn fetch_topic_partition(&self) -> ConnectorResult<Vec<i32>> {
546 let metadata = self
548 .client
549 .fetch_metadata(Some(self.topic.as_str()), self.sync_call_timeout)
550 .await?;
551
552 let topic_meta = match metadata.topics() {
553 [meta] => meta,
554 _ => bail!("topic {} not found", self.topic),
555 };
556
557 if topic_meta.partitions().is_empty() {
558 bail!("topic {} not found", self.topic);
559 }
560
561 Ok(topic_meta
562 .partitions()
563 .iter()
564 .map(|partition| partition.id())
565 .collect())
566 }
567}