1use std::collections::HashMap;
16use std::sync::{Arc, LazyLock, Weak};
17use std::time::Duration;
18
19use anyhow::{Context, anyhow};
20use async_trait::async_trait;
21use moka::future::Cache as MokaCache;
22use moka::ops::compute::Op;
23use rdkafka::admin::{AdminClient, AdminOptions};
24use rdkafka::consumer::{BaseConsumer, Consumer};
25use rdkafka::error::{KafkaError, KafkaResult};
26use rdkafka::types::RDKafkaErrorCode;
27use rdkafka::{ClientConfig, Offset, TopicPartitionList};
28use risingwave_common::bail;
29use risingwave_common::metrics::LabelGuardedIntGauge;
30
31use crate::connector_common::read_kafka_log_level;
32use crate::error::{ConnectorError, ConnectorResult};
33use crate::source::SourceEnumeratorContextRef;
34use crate::source::base::SplitEnumerator;
35use crate::source::kafka::split::KafkaSplit;
36use crate::source::kafka::{
37 KAFKA_ISOLATION_LEVEL, KafkaConnectionProps, KafkaContextCommon, KafkaProperties,
38 RwConsumerContext,
39};
40
41type KafkaConsumer = BaseConsumer<RwConsumerContext>;
42type KafkaAdmin = AdminClient<RwConsumerContext>;
43
44pub static SHARED_KAFKA_CONSUMER: LazyLock<MokaCache<KafkaConnectionProps, Weak<KafkaConsumer>>> =
46 LazyLock::new(|| moka::future::Cache::builder().build());
47pub static SHARED_KAFKA_ADMIN: LazyLock<MokaCache<KafkaConnectionProps, Arc<KafkaAdmin>>> =
49 LazyLock::new(|| {
50 moka::future::Cache::builder()
51 .time_to_idle(Duration::from_secs(5 * 60))
52 .build()
53 });
54
55#[derive(Debug, Copy, Clone, Eq, PartialEq)]
56pub enum KafkaEnumeratorOffset {
57 Earliest,
58 Latest,
59 Timestamp(i64),
60 None,
61}
62
63pub struct KafkaSplitEnumerator {
64 context: SourceEnumeratorContextRef,
65 broker_address: String,
66 topic: String,
67 client: Arc<KafkaConsumer>,
68 start_offset: KafkaEnumeratorOffset,
69
70 stop_offset: KafkaEnumeratorOffset,
72
73 sync_call_timeout: Duration,
74 high_watermark_metrics: HashMap<i32, LabelGuardedIntGauge>,
75
76 properties: KafkaProperties,
77 config: rdkafka::ClientConfig,
78}
79
80impl KafkaSplitEnumerator {
81 async fn drop_consumer_groups(&self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
82 let admin = SHARED_KAFKA_ADMIN
83 .try_get_with_by_ref(&self.properties.connection, async {
84 tracing::info!("build new kafka admin for {}", self.broker_address);
85 Ok(Arc::new(
86 build_kafka_admin(&self.config, &self.properties).await?,
87 ))
88 })
89 .await?;
90
91 let group_ids = fragment_ids
92 .iter()
93 .map(|fragment_id| self.properties.group_id(*fragment_id))
94 .collect::<Vec<_>>();
95 let group_ids: Vec<&str> = group_ids.iter().map(|s| s.as_str()).collect();
96 let res = admin
97 .delete_groups(&group_ids, &AdminOptions::default())
98 .await?;
99 tracing::debug!(
100 topic = self.topic,
101 ?fragment_ids,
102 "delete groups result: {res:?}"
103 );
104 Ok(())
105 }
106}
107
108#[async_trait]
109impl SplitEnumerator for KafkaSplitEnumerator {
110 type Properties = KafkaProperties;
111 type Split = KafkaSplit;
112
113 async fn new(
114 properties: KafkaProperties,
115 context: SourceEnumeratorContextRef,
116 ) -> ConnectorResult<KafkaSplitEnumerator> {
117 let mut config = rdkafka::ClientConfig::new();
118 let common_props = &properties.common;
119
120 let broker_address = properties.connection.brokers.clone();
121 let topic = common_props.topic.clone();
122 config.set("bootstrap.servers", &broker_address);
123 config.set("isolation.level", KAFKA_ISOLATION_LEVEL);
124 if let Some(log_level) = read_kafka_log_level() {
125 config.set_log_level(log_level);
126 }
127 properties.connection.set_security_properties(&mut config);
128 properties.set_client(&mut config);
129 let mut scan_start_offset = match properties
130 .scan_startup_mode
131 .as_ref()
132 .map(|s| s.to_lowercase())
133 .as_deref()
134 {
135 Some("earliest") => KafkaEnumeratorOffset::Earliest,
136 Some("latest") => KafkaEnumeratorOffset::Latest,
137 None => KafkaEnumeratorOffset::Earliest,
138 _ => bail!(
139 "properties `scan_startup_mode` only supports earliest and latest or leaving it empty"
140 ),
141 };
142
143 if let Some(s) = &properties.time_offset {
144 let time_offset = s.parse::<i64>().map_err(|e| anyhow!(e))?;
145 scan_start_offset = KafkaEnumeratorOffset::Timestamp(time_offset)
146 }
147
148 let mut client: Option<Arc<KafkaConsumer>> = None;
149 SHARED_KAFKA_CONSUMER
150 .entry_by_ref(&properties.connection)
151 .and_try_compute_with::<_, _, ConnectorError>(|maybe_entry| async {
152 if let Some(entry) = maybe_entry {
153 let entry_value = entry.into_value();
154 if let Some(client_) = entry_value.upgrade() {
155 tracing::info!("reuse existing kafka client for {}", broker_address);
157 client = Some(client_);
158 return Ok(Op::Nop);
159 }
160 }
161 tracing::info!("build new kafka client for {}", broker_address);
162 client = Some(build_kafka_client(&config, &properties).await?);
163 Ok(Op::Put(Arc::downgrade(client.as_ref().unwrap())))
164 })
165 .await?;
166
167 Ok(Self {
168 context,
169 broker_address,
170 topic,
171 client: client.unwrap(),
172 start_offset: scan_start_offset,
173 stop_offset: KafkaEnumeratorOffset::None,
174 sync_call_timeout: properties.common.sync_call_timeout,
175 high_watermark_metrics: HashMap::new(),
176 properties,
177 config,
178 })
179 }
180
181 async fn list_splits(&mut self) -> ConnectorResult<Vec<KafkaSplit>> {
182 let topic_partitions = self.fetch_topic_partition().await.with_context(|| {
183 format!(
184 "failed to fetch metadata from kafka ({})",
185 self.broker_address
186 )
187 })?;
188
189 let watermarks = self.get_watermarks(topic_partitions.as_ref()).await?;
190 let mut start_offsets = self
191 .fetch_start_offset(topic_partitions.as_ref(), &watermarks)
192 .await?;
193
194 let mut stop_offsets = self
195 .fetch_stop_offset(topic_partitions.as_ref(), &watermarks)
196 .await?;
197
198 let ret: Vec<_> = topic_partitions
199 .into_iter()
200 .map(|partition| KafkaSplit {
201 topic: self.topic.clone(),
202 partition,
203 start_offset: start_offsets.remove(&partition).unwrap(),
204 stop_offset: stop_offsets.remove(&partition).unwrap(),
205 })
206 .collect();
207
208 Ok(ret)
209 }
210
211 async fn on_drop_fragments(&mut self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
212 self.drop_consumer_groups(fragment_ids).await
213 }
214
215 async fn on_finish_backfill(&mut self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
216 self.drop_consumer_groups(fragment_ids).await
217 }
218}
219
220async fn build_kafka_client(
221 config: &ClientConfig,
222 properties: &KafkaProperties,
223) -> ConnectorResult<Arc<KafkaConsumer>> {
224 let ctx_common = KafkaContextCommon::new(
225 properties.privatelink_common.broker_rewrite_map.clone(),
226 None,
227 None,
228 properties.aws_auth_props.clone(),
229 properties.connection.is_aws_msk_iam(),
230 )
231 .await?;
232 let client_ctx = RwConsumerContext::new(ctx_common);
233 let client: KafkaConsumer = config.create_with_context(client_ctx).await?;
234
235 if properties.connection.is_aws_msk_iam() {
241 #[cfg(not(madsim))]
242 client.poll(Duration::from_secs(10)); #[cfg(madsim)]
244 client.poll(Duration::from_secs(10)).await;
245 }
246 Ok(Arc::new(client))
247}
248async fn build_kafka_admin(
249 config: &ClientConfig,
250 properties: &KafkaProperties,
251) -> ConnectorResult<KafkaAdmin> {
252 let ctx_common = KafkaContextCommon::new(
253 properties.privatelink_common.broker_rewrite_map.clone(),
254 None,
255 None,
256 properties.aws_auth_props.clone(),
257 properties.connection.is_aws_msk_iam(),
258 )
259 .await?;
260 let client_ctx = RwConsumerContext::new(ctx_common);
261 let client: KafkaAdmin = config.create_with_context(client_ctx).await?;
262 Ok(client)
264}
265
266impl KafkaSplitEnumerator {
267 async fn get_watermarks(
268 &mut self,
269 partitions: &[i32],
270 ) -> KafkaResult<HashMap<i32, (i64, i64)>> {
271 let mut map = HashMap::new();
272 for partition in partitions {
273 let (low, high) = self
274 .client
275 .fetch_watermarks(self.topic.as_str(), *partition, self.sync_call_timeout)
276 .await?;
277 self.report_high_watermark(*partition, high);
278 map.insert(*partition, (low, high));
279 }
280 tracing::debug!("fetch kafka watermarks: {map:?}");
281 Ok(map)
282 }
283
284 pub async fn list_splits_batch(
285 &mut self,
286 expect_start_timestamp_millis: Option<i64>,
287 expect_stop_timestamp_millis: Option<i64>,
288 ) -> ConnectorResult<Vec<KafkaSplit>> {
289 let topic_partitions = self.fetch_topic_partition().await.with_context(|| {
290 format!(
291 "failed to fetch metadata from kafka ({})",
292 self.broker_address
293 )
294 })?;
295
296 let mut watermarks = self.get_watermarks(topic_partitions.as_ref()).await?;
299
300 let mut expect_start_offset = if let Some(ts) = expect_start_timestamp_millis {
305 Some(
306 self.fetch_offset_for_time(topic_partitions.as_ref(), ts, &watermarks)
307 .await?,
308 )
309 } else {
310 None
311 };
312
313 let mut expect_stop_offset = if let Some(ts) = expect_stop_timestamp_millis {
314 Some(
315 self.fetch_offset_for_time(topic_partitions.as_ref(), ts, &watermarks)
316 .await?,
317 )
318 } else {
319 None
320 };
321
322 Ok(topic_partitions
323 .iter()
324 .map(|partition| {
325 let (low, high) = watermarks.remove(partition).unwrap();
326 let start_offset = {
327 let earliest_offset = low - 1;
328 let start = expect_start_offset
329 .as_mut()
330 .map(|m| m.remove(partition).flatten().unwrap_or(earliest_offset))
331 .unwrap_or(earliest_offset);
332 i64::max(start, earliest_offset)
333 };
334 let stop_offset = {
335 let stop = expect_stop_offset
336 .as_mut()
337 .map(|m| m.remove(partition).unwrap_or(Some(high)))
338 .unwrap_or(Some(high))
339 .unwrap_or(high);
340 i64::min(stop, high)
341 };
342
343 if start_offset > stop_offset {
344 tracing::warn!(
345 "Skipping topic {} partition {}: requested start offset {} is greater than stop offset {}",
346 self.topic,
347 partition,
348 start_offset,
349 stop_offset
350 );
351 }
352 KafkaSplit {
353 topic: self.topic.clone(),
354 partition: *partition,
355 start_offset: Some(start_offset),
356 stop_offset: Some(stop_offset),
357 }
358 })
359 .collect::<Vec<KafkaSplit>>())
360 }
361
362 async fn fetch_stop_offset(
363 &self,
364 partitions: &[i32],
365 watermarks: &HashMap<i32, (i64, i64)>,
366 ) -> KafkaResult<HashMap<i32, Option<i64>>> {
367 match self.stop_offset {
368 KafkaEnumeratorOffset::Earliest => unreachable!(),
369 KafkaEnumeratorOffset::Latest => {
370 let mut map = HashMap::new();
371 for partition in partitions {
372 let (_, high_watermark) = watermarks.get(partition).unwrap();
373 map.insert(*partition, Some(*high_watermark));
374 }
375 Ok(map)
376 }
377 KafkaEnumeratorOffset::Timestamp(time) => {
378 self.fetch_offset_for_time(partitions, time, watermarks)
379 .await
380 }
381 KafkaEnumeratorOffset::None => partitions
382 .iter()
383 .map(|partition| Ok((*partition, None)))
384 .collect(),
385 }
386 }
387
388 async fn fetch_start_offset(
389 &self,
390 partitions: &[i32],
391 watermarks: &HashMap<i32, (i64, i64)>,
392 ) -> KafkaResult<HashMap<i32, Option<i64>>> {
393 match self.start_offset {
394 KafkaEnumeratorOffset::Earliest | KafkaEnumeratorOffset::Latest => {
395 let mut map = HashMap::new();
396 for partition in partitions {
397 let (low_watermark, high_watermark) = watermarks.get(partition).unwrap();
398 let offset = match self.start_offset {
399 KafkaEnumeratorOffset::Earliest => low_watermark - 1,
400 KafkaEnumeratorOffset::Latest => high_watermark - 1,
401 _ => unreachable!(),
402 };
403 map.insert(*partition, Some(offset));
404 }
405 Ok(map)
406 }
407 KafkaEnumeratorOffset::Timestamp(time) => {
408 self.fetch_offset_for_time(partitions, time, watermarks)
409 .await
410 }
411 KafkaEnumeratorOffset::None => partitions
412 .iter()
413 .map(|partition| Ok((*partition, None)))
414 .collect(),
415 }
416 }
417
418 async fn fetch_offset_for_time(
419 &self,
420 partitions: &[i32],
421 time: i64,
422 watermarks: &HashMap<i32, (i64, i64)>,
423 ) -> KafkaResult<HashMap<i32, Option<i64>>> {
424 let mut tpl = TopicPartitionList::new();
425
426 for partition in partitions {
427 tpl.add_partition_offset(self.topic.as_str(), *partition, Offset::Offset(time))?;
428 }
429
430 let offsets = self
431 .client
432 .offsets_for_times(tpl, self.sync_call_timeout)
433 .await?;
434
435 let mut result = HashMap::with_capacity(partitions.len());
436
437 for elem in offsets.elements_for_topic(self.topic.as_str()) {
438 match elem.offset() {
439 Offset::Offset(offset) => {
440 result.insert(elem.partition(), Some(offset - 1));
442 }
443 Offset::End => {
444 let (_, high_watermark) = watermarks.get(&elem.partition()).unwrap();
445 tracing::info!(
446 source_id = self.context.info.source_id,
447 "no message found before timestamp {} (ms) for partition {}, start from latest",
448 time,
449 elem.partition()
450 );
451 result.insert(elem.partition(), Some(high_watermark - 1)); }
453 Offset::Invalid => {
454 tracing::info!(
459 source_id = self.context.info.source_id,
460 "got invalid offset for partition {} at timestamp {}, align to latest",
461 elem.partition(),
462 time
463 );
464 let (_, high_watermark) = watermarks.get(&elem.partition()).unwrap();
465 result.insert(elem.partition(), Some(high_watermark - 1)); }
467 Offset::Beginning => {
468 let (low, _) = watermarks.get(&elem.partition()).unwrap();
469 tracing::info!(
470 source_id = self.context.info.source_id,
471 "all message in partition {} is after timestamp {} (ms), start from earliest",
472 elem.partition(),
473 time,
474 );
475 result.insert(elem.partition(), Some(low - 1)); }
477 err_offset @ Offset::Stored | err_offset @ Offset::OffsetTail(_) => {
478 tracing::error!(
479 source_id = self.context.info.source_id,
480 "got invalid offset for partition {}: {err_offset:?}",
481 elem.partition(),
482 err_offset = err_offset,
483 );
484 return Err(KafkaError::OffsetFetch(RDKafkaErrorCode::NoOffset));
485 }
486 }
487 }
488
489 Ok(result)
490 }
491
492 #[inline]
493 fn report_high_watermark(&mut self, partition: i32, offset: i64) {
494 let high_watermark_metrics =
495 self.high_watermark_metrics
496 .entry(partition)
497 .or_insert_with(|| {
498 self.context
499 .metrics
500 .high_watermark
501 .with_guarded_label_values(&[
502 &self.context.info.source_id.to_string(),
503 &partition.to_string(),
504 ])
505 });
506 high_watermark_metrics.set(offset);
507 }
508
509 pub async fn check_reachability(&self) -> bool {
510 self.client
511 .fetch_metadata(Some(self.topic.as_str()), self.sync_call_timeout)
512 .await
513 .is_ok()
514 }
515
516 async fn fetch_topic_partition(&self) -> ConnectorResult<Vec<i32>> {
517 let metadata = self
519 .client
520 .fetch_metadata(Some(self.topic.as_str()), self.sync_call_timeout)
521 .await?;
522
523 let topic_meta = match metadata.topics() {
524 [meta] => meta,
525 _ => bail!("topic {} not found", self.topic),
526 };
527
528 if topic_meta.partitions().is_empty() {
529 bail!("topic {} not found", self.topic);
530 }
531
532 Ok(topic_meta
533 .partitions()
534 .iter()
535 .map(|partition| partition.id())
536 .collect())
537 }
538}