1use std::collections::HashMap;
16use std::sync::{Arc, LazyLock, Weak};
17use std::time::Duration;
18
19use anyhow::{Context, anyhow};
20use async_trait::async_trait;
21use moka::future::Cache as MokaCache;
22use moka::ops::compute::Op;
23use rdkafka::admin::{AdminClient, AdminOptions};
24use rdkafka::consumer::{BaseConsumer, Consumer};
25use rdkafka::error::{KafkaError, KafkaResult};
26use rdkafka::types::RDKafkaErrorCode;
27use rdkafka::{ClientConfig, Offset, TopicPartitionList};
28use risingwave_common::bail;
29use risingwave_common::id::FragmentId;
30use risingwave_common::metrics::LabelGuardedIntGauge;
31
32use crate::connector_common::read_kafka_log_level;
33use crate::error::{ConnectorError, ConnectorResult};
34use crate::source::SourceEnumeratorContextRef;
35use crate::source::base::SplitEnumerator;
36use crate::source::kafka::split::KafkaSplit;
37use crate::source::kafka::{
38 KAFKA_ISOLATION_LEVEL, KafkaConnectionProps, KafkaContextCommon, KafkaProperties,
39 RwConsumerContext,
40};
41
42type KafkaConsumer = BaseConsumer<RwConsumerContext>;
43type KafkaAdmin = AdminClient<RwConsumerContext>;
44
45pub static SHARED_KAFKA_CONSUMER: LazyLock<MokaCache<KafkaConnectionProps, Weak<KafkaConsumer>>> =
47 LazyLock::new(|| moka::future::Cache::builder().build());
48pub static SHARED_KAFKA_ADMIN: LazyLock<MokaCache<KafkaConnectionProps, Arc<KafkaAdmin>>> =
50 LazyLock::new(|| {
51 moka::future::Cache::builder()
52 .time_to_idle(Duration::from_secs(5 * 60))
53 .build()
54 });
55
56#[derive(Debug, Copy, Clone, Eq, PartialEq)]
57pub enum KafkaEnumeratorOffset {
58 Earliest,
59 Latest,
60 Timestamp(i64),
61 None,
62}
63
64pub struct KafkaSplitEnumerator {
65 context: SourceEnumeratorContextRef,
66 broker_address: String,
67 topic: String,
68 client: Arc<KafkaConsumer>,
69 start_offset: KafkaEnumeratorOffset,
70
71 stop_offset: KafkaEnumeratorOffset,
73
74 sync_call_timeout: Duration,
75 high_watermark_metrics: HashMap<i32, LabelGuardedIntGauge>,
76
77 properties: KafkaProperties,
78 config: rdkafka::ClientConfig,
79}
80
81impl KafkaSplitEnumerator {
82 async fn drop_consumer_groups(&self, fragment_ids: Vec<FragmentId>) -> ConnectorResult<()> {
83 let admin = Box::pin(SHARED_KAFKA_ADMIN.try_get_with_by_ref(
84 &self.properties.connection,
85 async {
86 tracing::info!("build new kafka admin for {}", self.broker_address);
87 Ok(Arc::new(
88 build_kafka_admin(&self.config, &self.properties).await?,
89 ))
90 },
91 ))
92 .await?;
93
94 let group_ids = fragment_ids
95 .iter()
96 .map(|fragment_id| self.properties.group_id(*fragment_id))
97 .collect::<Vec<_>>();
98 let group_ids: Vec<&str> = group_ids.iter().map(|s| s.as_str()).collect();
99 let res = admin
100 .delete_groups(&group_ids, &AdminOptions::default())
101 .await?;
102 tracing::debug!(
103 topic = self.topic,
104 ?fragment_ids,
105 "delete groups result: {res:?}"
106 );
107 Ok(())
108 }
109}
110
111#[async_trait]
112impl SplitEnumerator for KafkaSplitEnumerator {
113 type Properties = KafkaProperties;
114 type Split = KafkaSplit;
115
116 async fn new(
117 properties: KafkaProperties,
118 context: SourceEnumeratorContextRef,
119 ) -> ConnectorResult<KafkaSplitEnumerator> {
120 let mut config = rdkafka::ClientConfig::new();
121 let common_props = &properties.common;
122
123 let broker_address = properties.connection.brokers.clone();
124 let topic = common_props.topic.clone();
125 config.set("bootstrap.servers", &broker_address);
126 config.set("isolation.level", KAFKA_ISOLATION_LEVEL);
127 if let Some(log_level) = read_kafka_log_level() {
128 config.set_log_level(log_level);
129 }
130 properties.connection.set_security_properties(&mut config);
131 properties.set_client(&mut config);
132 let mut scan_start_offset = match properties
133 .scan_startup_mode
134 .as_ref()
135 .map(|s| s.to_lowercase())
136 .as_deref()
137 {
138 Some("earliest") => KafkaEnumeratorOffset::Earliest,
139 Some("latest") => KafkaEnumeratorOffset::Latest,
140 None => KafkaEnumeratorOffset::Earliest,
141 _ => bail!(
142 "properties `scan_startup_mode` only supports earliest and latest or leaving it empty"
143 ),
144 };
145
146 if let Some(s) = &properties.time_offset {
147 let time_offset = s.parse::<i64>().map_err(|e| anyhow!(e))?;
148 scan_start_offset = KafkaEnumeratorOffset::Timestamp(time_offset)
149 }
150
151 let mut client: Option<Arc<KafkaConsumer>> = None;
152 SHARED_KAFKA_CONSUMER
153 .entry_by_ref(&properties.connection)
154 .and_try_compute_with::<_, _, ConnectorError>(|maybe_entry| async {
155 if let Some(entry) = maybe_entry {
156 let entry_value = entry.into_value();
157 if let Some(client_) = entry_value.upgrade() {
158 tracing::info!("reuse existing kafka client for {}", broker_address);
160 client = Some(client_);
161 return Ok(Op::Nop);
162 }
163 }
164 tracing::info!("build new kafka client for {}", broker_address);
165 client = Some(build_kafka_client(&config, &properties).await?);
166 Ok(Op::Put(Arc::downgrade(client.as_ref().unwrap())))
167 })
168 .await?;
169
170 Ok(Self {
171 context,
172 broker_address,
173 topic,
174 client: client.unwrap(),
175 start_offset: scan_start_offset,
176 stop_offset: KafkaEnumeratorOffset::None,
177 sync_call_timeout: properties.common.sync_call_timeout,
178 high_watermark_metrics: HashMap::new(),
179 properties,
180 config,
181 })
182 }
183
184 async fn list_splits(&mut self) -> ConnectorResult<Vec<KafkaSplit>> {
185 let topic_partitions = self.fetch_topic_partition().await.with_context(|| {
186 format!(
187 "failed to fetch metadata from kafka ({})",
188 self.broker_address
189 )
190 })?;
191
192 let watermarks = self.get_watermarks(topic_partitions.as_ref()).await?;
193 let mut start_offsets = self
194 .fetch_start_offset(topic_partitions.as_ref(), &watermarks)
195 .await?;
196
197 let mut stop_offsets = self
198 .fetch_stop_offset(topic_partitions.as_ref(), &watermarks)
199 .await?;
200
201 let ret: Vec<_> = topic_partitions
202 .into_iter()
203 .map(|partition| KafkaSplit {
204 topic: self.topic.clone(),
205 partition,
206 start_offset: start_offsets.remove(&partition).unwrap(),
207 stop_offset: stop_offsets.remove(&partition).unwrap(),
208 })
209 .collect();
210
211 Ok(ret)
212 }
213
214 async fn on_drop_fragments(&mut self, fragment_ids: Vec<FragmentId>) -> ConnectorResult<()> {
215 self.drop_consumer_groups(fragment_ids).await
216 }
217
218 async fn on_finish_backfill(&mut self, fragment_ids: Vec<FragmentId>) -> ConnectorResult<()> {
219 self.drop_consumer_groups(fragment_ids).await
220 }
221}
222
223async fn build_kafka_client(
224 config: &ClientConfig,
225 properties: &KafkaProperties,
226) -> ConnectorResult<Arc<KafkaConsumer>> {
227 let ctx_common = KafkaContextCommon::new(
228 properties.privatelink_common.broker_rewrite_map.clone(),
229 None,
230 None,
231 properties.aws_auth_props.clone(),
232 properties.connection.is_aws_msk_iam(),
233 )
234 .await?;
235 let client_ctx = RwConsumerContext::new(ctx_common);
236 let client: KafkaConsumer = config.create_with_context(client_ctx).await?;
237
238 if properties.connection.is_aws_msk_iam() {
244 #[cfg(not(madsim))]
245 client.poll(Duration::from_secs(10)); #[cfg(madsim)]
247 client.poll(Duration::from_secs(10)).await;
248 }
249 Ok(Arc::new(client))
250}
251async fn build_kafka_admin(
252 config: &ClientConfig,
253 properties: &KafkaProperties,
254) -> ConnectorResult<KafkaAdmin> {
255 let ctx_common = KafkaContextCommon::new(
256 properties.privatelink_common.broker_rewrite_map.clone(),
257 None,
258 None,
259 properties.aws_auth_props.clone(),
260 properties.connection.is_aws_msk_iam(),
261 )
262 .await?;
263 let client_ctx = RwConsumerContext::new(ctx_common);
264 let client: KafkaAdmin = config.create_with_context(client_ctx).await?;
265 Ok(client)
267}
268
269impl KafkaSplitEnumerator {
270 async fn get_watermarks(
271 &mut self,
272 partitions: &[i32],
273 ) -> KafkaResult<HashMap<i32, (i64, i64)>> {
274 let mut map = HashMap::new();
275 for partition in partitions {
276 let (low, high) = self
277 .client
278 .fetch_watermarks(self.topic.as_str(), *partition, self.sync_call_timeout)
279 .await?;
280 self.report_high_watermark(*partition, high);
281 map.insert(*partition, (low, high));
282 }
283 tracing::debug!("fetch kafka watermarks: {map:?}");
284 Ok(map)
285 }
286
287 pub async fn list_splits_batch(
288 &mut self,
289 expect_start_timestamp_millis: Option<i64>,
290 expect_stop_timestamp_millis: Option<i64>,
291 ) -> ConnectorResult<Vec<KafkaSplit>> {
292 let topic_partitions = self.fetch_topic_partition().await.with_context(|| {
293 format!(
294 "failed to fetch metadata from kafka ({})",
295 self.broker_address
296 )
297 })?;
298
299 let mut watermarks = self.get_watermarks(topic_partitions.as_ref()).await?;
302
303 let mut expect_start_offset = if let Some(ts) = expect_start_timestamp_millis {
308 Some(
309 self.fetch_offset_for_time(topic_partitions.as_ref(), ts, &watermarks)
310 .await?,
311 )
312 } else {
313 None
314 };
315
316 let mut expect_stop_offset = if let Some(ts) = expect_stop_timestamp_millis {
317 Some(
318 self.fetch_offset_for_time(topic_partitions.as_ref(), ts, &watermarks)
319 .await?,
320 )
321 } else {
322 None
323 };
324
325 Ok(topic_partitions
326 .iter()
327 .map(|partition| {
328 let (low, high) = watermarks.remove(partition).unwrap();
329 let start_offset = {
330 let earliest_offset = low - 1;
331 let start = expect_start_offset
332 .as_mut()
333 .map(|m| m.remove(partition).flatten().unwrap_or(earliest_offset))
334 .unwrap_or(earliest_offset);
335 i64::max(start, earliest_offset)
336 };
337 let stop_offset = {
338 let stop = expect_stop_offset
339 .as_mut()
340 .map(|m| m.remove(partition).unwrap_or(Some(high)))
341 .unwrap_or(Some(high))
342 .unwrap_or(high);
343 i64::min(stop, high)
344 };
345
346 if start_offset > stop_offset {
347 tracing::warn!(
348 "Skipping topic {} partition {}: requested start offset {} is greater than stop offset {}",
349 self.topic,
350 partition,
351 start_offset,
352 stop_offset
353 );
354 }
355 KafkaSplit {
356 topic: self.topic.clone(),
357 partition: *partition,
358 start_offset: Some(start_offset),
359 stop_offset: Some(stop_offset),
360 }
361 })
362 .collect::<Vec<KafkaSplit>>())
363 }
364
365 async fn fetch_stop_offset(
366 &self,
367 partitions: &[i32],
368 watermarks: &HashMap<i32, (i64, i64)>,
369 ) -> KafkaResult<HashMap<i32, Option<i64>>> {
370 match self.stop_offset {
371 KafkaEnumeratorOffset::Earliest => unreachable!(),
372 KafkaEnumeratorOffset::Latest => {
373 let mut map = HashMap::new();
374 for partition in partitions {
375 let (_, high_watermark) = watermarks.get(partition).unwrap();
376 map.insert(*partition, Some(*high_watermark));
377 }
378 Ok(map)
379 }
380 KafkaEnumeratorOffset::Timestamp(time) => {
381 self.fetch_offset_for_time(partitions, time, watermarks)
382 .await
383 }
384 KafkaEnumeratorOffset::None => partitions
385 .iter()
386 .map(|partition| Ok((*partition, None)))
387 .collect(),
388 }
389 }
390
391 async fn fetch_start_offset(
392 &self,
393 partitions: &[i32],
394 watermarks: &HashMap<i32, (i64, i64)>,
395 ) -> KafkaResult<HashMap<i32, Option<i64>>> {
396 match self.start_offset {
397 KafkaEnumeratorOffset::Earliest | KafkaEnumeratorOffset::Latest => {
398 let mut map = HashMap::new();
399 for partition in partitions {
400 let (low_watermark, high_watermark) = watermarks.get(partition).unwrap();
401 let offset = match self.start_offset {
402 KafkaEnumeratorOffset::Earliest => low_watermark - 1,
403 KafkaEnumeratorOffset::Latest => high_watermark - 1,
404 _ => unreachable!(),
405 };
406 map.insert(*partition, Some(offset));
407 }
408 Ok(map)
409 }
410 KafkaEnumeratorOffset::Timestamp(time) => {
411 self.fetch_offset_for_time(partitions, time, watermarks)
412 .await
413 }
414 KafkaEnumeratorOffset::None => partitions
415 .iter()
416 .map(|partition| Ok((*partition, None)))
417 .collect(),
418 }
419 }
420
421 async fn fetch_offset_for_time(
422 &self,
423 partitions: &[i32],
424 time: i64,
425 watermarks: &HashMap<i32, (i64, i64)>,
426 ) -> KafkaResult<HashMap<i32, Option<i64>>> {
427 let mut tpl = TopicPartitionList::new();
428
429 for partition in partitions {
430 tpl.add_partition_offset(self.topic.as_str(), *partition, Offset::Offset(time))?;
431 }
432
433 let offsets = self
434 .client
435 .offsets_for_times(tpl, self.sync_call_timeout)
436 .await?;
437
438 let mut result = HashMap::with_capacity(partitions.len());
439
440 for elem in offsets.elements_for_topic(self.topic.as_str()) {
441 match elem.offset() {
442 Offset::Offset(offset) => {
443 result.insert(elem.partition(), Some(offset - 1));
445 }
446 Offset::End => {
447 let (_, high_watermark) = watermarks.get(&elem.partition()).unwrap();
448 tracing::info!(
449 source_id = %self.context.info.source_id,
450 "no message found before timestamp {} (ms) for partition {}, start from latest",
451 time,
452 elem.partition()
453 );
454 result.insert(elem.partition(), Some(high_watermark - 1)); }
456 Offset::Invalid => {
457 tracing::info!(
462 source_id = %self.context.info.source_id,
463 "got invalid offset for partition {} at timestamp {}, align to latest",
464 elem.partition(),
465 time
466 );
467 let (_, high_watermark) = watermarks.get(&elem.partition()).unwrap();
468 result.insert(elem.partition(), Some(high_watermark - 1)); }
470 Offset::Beginning => {
471 let (low, _) = watermarks.get(&elem.partition()).unwrap();
472 tracing::info!(
473 source_id = %self.context.info.source_id,
474 "all message in partition {} is after timestamp {} (ms), start from earliest",
475 elem.partition(),
476 time,
477 );
478 result.insert(elem.partition(), Some(low - 1)); }
480 err_offset @ Offset::Stored | err_offset @ Offset::OffsetTail(_) => {
481 tracing::error!(
482 source_id = %self.context.info.source_id,
483 "got invalid offset for partition {}: {err_offset:?}",
484 elem.partition(),
485 err_offset = err_offset,
486 );
487 return Err(KafkaError::OffsetFetch(RDKafkaErrorCode::NoOffset));
488 }
489 }
490 }
491
492 Ok(result)
493 }
494
495 #[inline]
496 fn report_high_watermark(&mut self, partition: i32, offset: i64) {
497 let high_watermark_metrics =
498 self.high_watermark_metrics
499 .entry(partition)
500 .or_insert_with(|| {
501 self.context
502 .metrics
503 .high_watermark
504 .with_guarded_label_values(&[
505 &self.context.info.source_id.to_string(),
506 &partition.to_string(),
507 ])
508 });
509 high_watermark_metrics.set(offset);
510 }
511
512 pub async fn check_reachability(&self) -> ConnectorResult<()> {
513 let _ = self
514 .client
515 .fetch_metadata(Some(self.topic.as_str()), self.sync_call_timeout)
516 .await?;
517 Ok(())
518 }
519
520 async fn fetch_topic_partition(&self) -> ConnectorResult<Vec<i32>> {
521 let metadata = self
523 .client
524 .fetch_metadata(Some(self.topic.as_str()), self.sync_call_timeout)
525 .await?;
526
527 let topic_meta = match metadata.topics() {
528 [meta] => meta,
529 _ => bail!("topic {} not found", self.topic),
530 };
531
532 if topic_meta.partitions().is_empty() {
533 bail!("topic {} not found", self.topic);
534 }
535
536 Ok(topic_meta
537 .partitions()
538 .iter()
539 .map(|partition| partition.id())
540 .collect())
541 }
542}