1use std::collections::HashMap;
16use std::sync::{Arc, LazyLock, Weak};
17use std::time::Duration;
18
19use anyhow::{Context, anyhow};
20use async_trait::async_trait;
21use moka::future::Cache as MokaCache;
22use moka::ops::compute::Op;
23use rdkafka::admin::{AdminClient, AdminOptions};
24use rdkafka::consumer::{BaseConsumer, Consumer};
25use rdkafka::error::{KafkaError, KafkaResult};
26use rdkafka::types::RDKafkaErrorCode;
27use rdkafka::{ClientConfig, Offset, TopicPartitionList};
28use risingwave_common::bail;
29use risingwave_common::metrics::LabelGuardedIntGauge;
30
31use crate::connector_common::read_kafka_log_level;
32use crate::error::{ConnectorError, ConnectorResult};
33use crate::source::SourceEnumeratorContextRef;
34use crate::source::base::SplitEnumerator;
35use crate::source::kafka::split::KafkaSplit;
36use crate::source::kafka::{
37 KAFKA_ISOLATION_LEVEL, KafkaConnectionProps, KafkaContextCommon, KafkaProperties,
38 RwConsumerContext,
39};
40
41type KafkaConsumer = BaseConsumer<RwConsumerContext>;
42type KafkaAdmin = AdminClient<RwConsumerContext>;
43
44pub static SHARED_KAFKA_CONSUMER: LazyLock<MokaCache<KafkaConnectionProps, Weak<KafkaConsumer>>> =
46 LazyLock::new(|| moka::future::Cache::builder().build());
47pub static SHARED_KAFKA_ADMIN: LazyLock<MokaCache<KafkaConnectionProps, Arc<KafkaAdmin>>> =
49 LazyLock::new(|| {
50 moka::future::Cache::builder()
51 .time_to_idle(Duration::from_secs(5 * 60))
52 .build()
53 });
54
55#[derive(Debug, Copy, Clone, Eq, PartialEq)]
56pub enum KafkaEnumeratorOffset {
57 Earliest,
58 Latest,
59 Timestamp(i64),
60 None,
61}
62
63pub struct KafkaSplitEnumerator {
64 context: SourceEnumeratorContextRef,
65 broker_address: String,
66 topic: String,
67 client: Arc<KafkaConsumer>,
68 start_offset: KafkaEnumeratorOffset,
69
70 stop_offset: KafkaEnumeratorOffset,
72
73 sync_call_timeout: Duration,
74 high_watermark_metrics: HashMap<i32, LabelGuardedIntGauge>,
75
76 properties: KafkaProperties,
77 config: rdkafka::ClientConfig,
78}
79
80impl KafkaSplitEnumerator {
81 async fn drop_consumer_groups(&self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
82 let admin = Box::pin(SHARED_KAFKA_ADMIN.try_get_with_by_ref(
83 &self.properties.connection,
84 async {
85 tracing::info!("build new kafka admin for {}", self.broker_address);
86 Ok(Arc::new(
87 build_kafka_admin(&self.config, &self.properties).await?,
88 ))
89 },
90 ))
91 .await?;
92
93 let group_ids = fragment_ids
94 .iter()
95 .map(|fragment_id| self.properties.group_id(*fragment_id))
96 .collect::<Vec<_>>();
97 let group_ids: Vec<&str> = group_ids.iter().map(|s| s.as_str()).collect();
98 let res = admin
99 .delete_groups(&group_ids, &AdminOptions::default())
100 .await?;
101 tracing::debug!(
102 topic = self.topic,
103 ?fragment_ids,
104 "delete groups result: {res:?}"
105 );
106 Ok(())
107 }
108}
109
110#[async_trait]
111impl SplitEnumerator for KafkaSplitEnumerator {
112 type Properties = KafkaProperties;
113 type Split = KafkaSplit;
114
115 async fn new(
116 properties: KafkaProperties,
117 context: SourceEnumeratorContextRef,
118 ) -> ConnectorResult<KafkaSplitEnumerator> {
119 let mut config = rdkafka::ClientConfig::new();
120 let common_props = &properties.common;
121
122 let broker_address = properties.connection.brokers.clone();
123 let topic = common_props.topic.clone();
124 config.set("bootstrap.servers", &broker_address);
125 config.set("isolation.level", KAFKA_ISOLATION_LEVEL);
126 if let Some(log_level) = read_kafka_log_level() {
127 config.set_log_level(log_level);
128 }
129 properties.connection.set_security_properties(&mut config);
130 properties.set_client(&mut config);
131 let mut scan_start_offset = match properties
132 .scan_startup_mode
133 .as_ref()
134 .map(|s| s.to_lowercase())
135 .as_deref()
136 {
137 Some("earliest") => KafkaEnumeratorOffset::Earliest,
138 Some("latest") => KafkaEnumeratorOffset::Latest,
139 None => KafkaEnumeratorOffset::Earliest,
140 _ => bail!(
141 "properties `scan_startup_mode` only supports earliest and latest or leaving it empty"
142 ),
143 };
144
145 if let Some(s) = &properties.time_offset {
146 let time_offset = s.parse::<i64>().map_err(|e| anyhow!(e))?;
147 scan_start_offset = KafkaEnumeratorOffset::Timestamp(time_offset)
148 }
149
150 let mut client: Option<Arc<KafkaConsumer>> = None;
151 SHARED_KAFKA_CONSUMER
152 .entry_by_ref(&properties.connection)
153 .and_try_compute_with::<_, _, ConnectorError>(|maybe_entry| async {
154 if let Some(entry) = maybe_entry {
155 let entry_value = entry.into_value();
156 if let Some(client_) = entry_value.upgrade() {
157 tracing::info!("reuse existing kafka client for {}", broker_address);
159 client = Some(client_);
160 return Ok(Op::Nop);
161 }
162 }
163 tracing::info!("build new kafka client for {}", broker_address);
164 client = Some(build_kafka_client(&config, &properties).await?);
165 Ok(Op::Put(Arc::downgrade(client.as_ref().unwrap())))
166 })
167 .await?;
168
169 Ok(Self {
170 context,
171 broker_address,
172 topic,
173 client: client.unwrap(),
174 start_offset: scan_start_offset,
175 stop_offset: KafkaEnumeratorOffset::None,
176 sync_call_timeout: properties.common.sync_call_timeout,
177 high_watermark_metrics: HashMap::new(),
178 properties,
179 config,
180 })
181 }
182
183 async fn list_splits(&mut self) -> ConnectorResult<Vec<KafkaSplit>> {
184 let topic_partitions = self.fetch_topic_partition().await.with_context(|| {
185 format!(
186 "failed to fetch metadata from kafka ({})",
187 self.broker_address
188 )
189 })?;
190
191 let watermarks = self.get_watermarks(topic_partitions.as_ref()).await?;
192 let mut start_offsets = self
193 .fetch_start_offset(topic_partitions.as_ref(), &watermarks)
194 .await?;
195
196 let mut stop_offsets = self
197 .fetch_stop_offset(topic_partitions.as_ref(), &watermarks)
198 .await?;
199
200 let ret: Vec<_> = topic_partitions
201 .into_iter()
202 .map(|partition| KafkaSplit {
203 topic: self.topic.clone(),
204 partition,
205 start_offset: start_offsets.remove(&partition).unwrap(),
206 stop_offset: stop_offsets.remove(&partition).unwrap(),
207 })
208 .collect();
209
210 Ok(ret)
211 }
212
213 async fn on_drop_fragments(&mut self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
214 self.drop_consumer_groups(fragment_ids).await
215 }
216
217 async fn on_finish_backfill(&mut self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
218 self.drop_consumer_groups(fragment_ids).await
219 }
220}
221
222async fn build_kafka_client(
223 config: &ClientConfig,
224 properties: &KafkaProperties,
225) -> ConnectorResult<Arc<KafkaConsumer>> {
226 let ctx_common = KafkaContextCommon::new(
227 properties.privatelink_common.broker_rewrite_map.clone(),
228 None,
229 None,
230 properties.aws_auth_props.clone(),
231 properties.connection.is_aws_msk_iam(),
232 )
233 .await?;
234 let client_ctx = RwConsumerContext::new(ctx_common);
235 let client: KafkaConsumer = config.create_with_context(client_ctx).await?;
236
237 if properties.connection.is_aws_msk_iam() {
243 #[cfg(not(madsim))]
244 client.poll(Duration::from_secs(10)); #[cfg(madsim)]
246 client.poll(Duration::from_secs(10)).await;
247 }
248 Ok(Arc::new(client))
249}
250async fn build_kafka_admin(
251 config: &ClientConfig,
252 properties: &KafkaProperties,
253) -> ConnectorResult<KafkaAdmin> {
254 let ctx_common = KafkaContextCommon::new(
255 properties.privatelink_common.broker_rewrite_map.clone(),
256 None,
257 None,
258 properties.aws_auth_props.clone(),
259 properties.connection.is_aws_msk_iam(),
260 )
261 .await?;
262 let client_ctx = RwConsumerContext::new(ctx_common);
263 let client: KafkaAdmin = config.create_with_context(client_ctx).await?;
264 Ok(client)
266}
267
268impl KafkaSplitEnumerator {
269 async fn get_watermarks(
270 &mut self,
271 partitions: &[i32],
272 ) -> KafkaResult<HashMap<i32, (i64, i64)>> {
273 let mut map = HashMap::new();
274 for partition in partitions {
275 let (low, high) = self
276 .client
277 .fetch_watermarks(self.topic.as_str(), *partition, self.sync_call_timeout)
278 .await?;
279 self.report_high_watermark(*partition, high);
280 map.insert(*partition, (low, high));
281 }
282 tracing::debug!("fetch kafka watermarks: {map:?}");
283 Ok(map)
284 }
285
286 pub async fn list_splits_batch(
287 &mut self,
288 expect_start_timestamp_millis: Option<i64>,
289 expect_stop_timestamp_millis: Option<i64>,
290 ) -> ConnectorResult<Vec<KafkaSplit>> {
291 let topic_partitions = self.fetch_topic_partition().await.with_context(|| {
292 format!(
293 "failed to fetch metadata from kafka ({})",
294 self.broker_address
295 )
296 })?;
297
298 let mut watermarks = self.get_watermarks(topic_partitions.as_ref()).await?;
301
302 let mut expect_start_offset = if let Some(ts) = expect_start_timestamp_millis {
307 Some(
308 self.fetch_offset_for_time(topic_partitions.as_ref(), ts, &watermarks)
309 .await?,
310 )
311 } else {
312 None
313 };
314
315 let mut expect_stop_offset = if let Some(ts) = expect_stop_timestamp_millis {
316 Some(
317 self.fetch_offset_for_time(topic_partitions.as_ref(), ts, &watermarks)
318 .await?,
319 )
320 } else {
321 None
322 };
323
324 Ok(topic_partitions
325 .iter()
326 .map(|partition| {
327 let (low, high) = watermarks.remove(partition).unwrap();
328 let start_offset = {
329 let earliest_offset = low - 1;
330 let start = expect_start_offset
331 .as_mut()
332 .map(|m| m.remove(partition).flatten().unwrap_or(earliest_offset))
333 .unwrap_or(earliest_offset);
334 i64::max(start, earliest_offset)
335 };
336 let stop_offset = {
337 let stop = expect_stop_offset
338 .as_mut()
339 .map(|m| m.remove(partition).unwrap_or(Some(high)))
340 .unwrap_or(Some(high))
341 .unwrap_or(high);
342 i64::min(stop, high)
343 };
344
345 if start_offset > stop_offset {
346 tracing::warn!(
347 "Skipping topic {} partition {}: requested start offset {} is greater than stop offset {}",
348 self.topic,
349 partition,
350 start_offset,
351 stop_offset
352 );
353 }
354 KafkaSplit {
355 topic: self.topic.clone(),
356 partition: *partition,
357 start_offset: Some(start_offset),
358 stop_offset: Some(stop_offset),
359 }
360 })
361 .collect::<Vec<KafkaSplit>>())
362 }
363
364 async fn fetch_stop_offset(
365 &self,
366 partitions: &[i32],
367 watermarks: &HashMap<i32, (i64, i64)>,
368 ) -> KafkaResult<HashMap<i32, Option<i64>>> {
369 match self.stop_offset {
370 KafkaEnumeratorOffset::Earliest => unreachable!(),
371 KafkaEnumeratorOffset::Latest => {
372 let mut map = HashMap::new();
373 for partition in partitions {
374 let (_, high_watermark) = watermarks.get(partition).unwrap();
375 map.insert(*partition, Some(*high_watermark));
376 }
377 Ok(map)
378 }
379 KafkaEnumeratorOffset::Timestamp(time) => {
380 self.fetch_offset_for_time(partitions, time, watermarks)
381 .await
382 }
383 KafkaEnumeratorOffset::None => partitions
384 .iter()
385 .map(|partition| Ok((*partition, None)))
386 .collect(),
387 }
388 }
389
390 async fn fetch_start_offset(
391 &self,
392 partitions: &[i32],
393 watermarks: &HashMap<i32, (i64, i64)>,
394 ) -> KafkaResult<HashMap<i32, Option<i64>>> {
395 match self.start_offset {
396 KafkaEnumeratorOffset::Earliest | KafkaEnumeratorOffset::Latest => {
397 let mut map = HashMap::new();
398 for partition in partitions {
399 let (low_watermark, high_watermark) = watermarks.get(partition).unwrap();
400 let offset = match self.start_offset {
401 KafkaEnumeratorOffset::Earliest => low_watermark - 1,
402 KafkaEnumeratorOffset::Latest => high_watermark - 1,
403 _ => unreachable!(),
404 };
405 map.insert(*partition, Some(offset));
406 }
407 Ok(map)
408 }
409 KafkaEnumeratorOffset::Timestamp(time) => {
410 self.fetch_offset_for_time(partitions, time, watermarks)
411 .await
412 }
413 KafkaEnumeratorOffset::None => partitions
414 .iter()
415 .map(|partition| Ok((*partition, None)))
416 .collect(),
417 }
418 }
419
420 async fn fetch_offset_for_time(
421 &self,
422 partitions: &[i32],
423 time: i64,
424 watermarks: &HashMap<i32, (i64, i64)>,
425 ) -> KafkaResult<HashMap<i32, Option<i64>>> {
426 let mut tpl = TopicPartitionList::new();
427
428 for partition in partitions {
429 tpl.add_partition_offset(self.topic.as_str(), *partition, Offset::Offset(time))?;
430 }
431
432 let offsets = self
433 .client
434 .offsets_for_times(tpl, self.sync_call_timeout)
435 .await?;
436
437 let mut result = HashMap::with_capacity(partitions.len());
438
439 for elem in offsets.elements_for_topic(self.topic.as_str()) {
440 match elem.offset() {
441 Offset::Offset(offset) => {
442 result.insert(elem.partition(), Some(offset - 1));
444 }
445 Offset::End => {
446 let (_, high_watermark) = watermarks.get(&elem.partition()).unwrap();
447 tracing::info!(
448 source_id = self.context.info.source_id,
449 "no message found before timestamp {} (ms) for partition {}, start from latest",
450 time,
451 elem.partition()
452 );
453 result.insert(elem.partition(), Some(high_watermark - 1)); }
455 Offset::Invalid => {
456 tracing::info!(
461 source_id = self.context.info.source_id,
462 "got invalid offset for partition {} at timestamp {}, align to latest",
463 elem.partition(),
464 time
465 );
466 let (_, high_watermark) = watermarks.get(&elem.partition()).unwrap();
467 result.insert(elem.partition(), Some(high_watermark - 1)); }
469 Offset::Beginning => {
470 let (low, _) = watermarks.get(&elem.partition()).unwrap();
471 tracing::info!(
472 source_id = self.context.info.source_id,
473 "all message in partition {} is after timestamp {} (ms), start from earliest",
474 elem.partition(),
475 time,
476 );
477 result.insert(elem.partition(), Some(low - 1)); }
479 err_offset @ Offset::Stored | err_offset @ Offset::OffsetTail(_) => {
480 tracing::error!(
481 source_id = self.context.info.source_id,
482 "got invalid offset for partition {}: {err_offset:?}",
483 elem.partition(),
484 err_offset = err_offset,
485 );
486 return Err(KafkaError::OffsetFetch(RDKafkaErrorCode::NoOffset));
487 }
488 }
489 }
490
491 Ok(result)
492 }
493
494 #[inline]
495 fn report_high_watermark(&mut self, partition: i32, offset: i64) {
496 let high_watermark_metrics =
497 self.high_watermark_metrics
498 .entry(partition)
499 .or_insert_with(|| {
500 self.context
501 .metrics
502 .high_watermark
503 .with_guarded_label_values(&[
504 &self.context.info.source_id.to_string(),
505 &partition.to_string(),
506 ])
507 });
508 high_watermark_metrics.set(offset);
509 }
510
511 pub async fn check_reachability(&self) -> ConnectorResult<()> {
512 let _ = self
513 .client
514 .fetch_metadata(Some(self.topic.as_str()), self.sync_call_timeout)
515 .await?;
516 Ok(())
517 }
518
519 async fn fetch_topic_partition(&self) -> ConnectorResult<Vec<i32>> {
520 let metadata = self
522 .client
523 .fetch_metadata(Some(self.topic.as_str()), self.sync_call_timeout)
524 .await?;
525
526 let topic_meta = match metadata.topics() {
527 [meta] => meta,
528 _ => bail!("topic {} not found", self.topic),
529 };
530
531 if topic_meta.partitions().is_empty() {
532 bail!("topic {} not found", self.topic);
533 }
534
535 Ok(topic_meta
536 .partitions()
537 .iter()
538 .map(|partition| partition.id())
539 .collect())
540 }
541}