1use std::collections::HashMap;
16use std::sync::{Arc, LazyLock, Weak};
17use std::time::Duration;
18
19use anyhow::{Context, anyhow};
20use async_trait::async_trait;
21use moka::future::Cache as MokaCache;
22use moka::ops::compute::Op;
23use rdkafka::admin::{AdminClient, AdminOptions};
24use rdkafka::consumer::{BaseConsumer, Consumer};
25use rdkafka::error::KafkaResult;
26use rdkafka::{ClientConfig, Offset, TopicPartitionList};
27use risingwave_common::bail;
28use risingwave_common::metrics::LabelGuardedIntGauge;
29
30use crate::connector_common::read_kafka_log_level;
31use crate::error::{ConnectorError, ConnectorResult};
32use crate::source::SourceEnumeratorContextRef;
33use crate::source::base::SplitEnumerator;
34use crate::source::kafka::split::KafkaSplit;
35use crate::source::kafka::{
36 KAFKA_ISOLATION_LEVEL, KafkaConnectionProps, KafkaContextCommon, KafkaProperties,
37 RwConsumerContext,
38};
39
40type KafkaConsumer = BaseConsumer<RwConsumerContext>;
41type KafkaAdmin = AdminClient<RwConsumerContext>;
42
43pub static SHARED_KAFKA_CONSUMER: LazyLock<MokaCache<KafkaConnectionProps, Weak<KafkaConsumer>>> =
45 LazyLock::new(|| moka::future::Cache::builder().build());
46pub static SHARED_KAFKA_ADMIN: LazyLock<MokaCache<KafkaConnectionProps, Arc<KafkaAdmin>>> =
48 LazyLock::new(|| {
49 moka::future::Cache::builder()
50 .time_to_idle(Duration::from_secs(5 * 60))
51 .build()
52 });
53
54#[derive(Debug, Copy, Clone, Eq, PartialEq)]
55pub enum KafkaEnumeratorOffset {
56 Earliest,
57 Latest,
58 Timestamp(i64),
59 None,
60}
61
62pub struct KafkaSplitEnumerator {
63 context: SourceEnumeratorContextRef,
64 broker_address: String,
65 topic: String,
66 client: Arc<KafkaConsumer>,
67 start_offset: KafkaEnumeratorOffset,
68
69 stop_offset: KafkaEnumeratorOffset,
71
72 sync_call_timeout: Duration,
73 high_watermark_metrics: HashMap<i32, LabelGuardedIntGauge>,
74
75 properties: KafkaProperties,
76 config: rdkafka::ClientConfig,
77}
78
79impl KafkaSplitEnumerator {
80 async fn drop_consumer_groups(&self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
81 let admin = SHARED_KAFKA_ADMIN
82 .try_get_with_by_ref(&self.properties.connection, async {
83 tracing::info!("build new kafka admin for {}", self.broker_address);
84 Ok(Arc::new(
85 build_kafka_admin(&self.config, &self.properties).await?,
86 ))
87 })
88 .await?;
89
90 let group_ids = fragment_ids
91 .iter()
92 .map(|fragment_id| self.properties.group_id(*fragment_id))
93 .collect::<Vec<_>>();
94 let group_ids: Vec<&str> = group_ids.iter().map(|s| s.as_str()).collect();
95 let res = admin
96 .delete_groups(&group_ids, &AdminOptions::default())
97 .await?;
98 tracing::debug!(
99 topic = self.topic,
100 ?fragment_ids,
101 "delete groups result: {res:?}"
102 );
103 Ok(())
104 }
105}
106
107#[async_trait]
108impl SplitEnumerator for KafkaSplitEnumerator {
109 type Properties = KafkaProperties;
110 type Split = KafkaSplit;
111
112 async fn new(
113 properties: KafkaProperties,
114 context: SourceEnumeratorContextRef,
115 ) -> ConnectorResult<KafkaSplitEnumerator> {
116 let mut config = rdkafka::ClientConfig::new();
117 let common_props = &properties.common;
118
119 let broker_address = properties.connection.brokers.clone();
120 let topic = common_props.topic.clone();
121 config.set("bootstrap.servers", &broker_address);
122 config.set("isolation.level", KAFKA_ISOLATION_LEVEL);
123 if let Some(log_level) = read_kafka_log_level() {
124 config.set_log_level(log_level);
125 }
126 properties.connection.set_security_properties(&mut config);
127 properties.set_client(&mut config);
128 let mut scan_start_offset = match properties
129 .scan_startup_mode
130 .as_ref()
131 .map(|s| s.to_lowercase())
132 .as_deref()
133 {
134 Some("earliest") => KafkaEnumeratorOffset::Earliest,
135 Some("latest") => KafkaEnumeratorOffset::Latest,
136 None => KafkaEnumeratorOffset::Earliest,
137 _ => bail!(
138 "properties `scan_startup_mode` only supports earliest and latest or leaving it empty"
139 ),
140 };
141
142 if let Some(s) = &properties.time_offset {
143 let time_offset = s.parse::<i64>().map_err(|e| anyhow!(e))?;
144 scan_start_offset = KafkaEnumeratorOffset::Timestamp(time_offset)
145 }
146
147 let mut client: Option<Arc<KafkaConsumer>> = None;
148 SHARED_KAFKA_CONSUMER
149 .entry_by_ref(&properties.connection)
150 .and_try_compute_with::<_, _, ConnectorError>(|maybe_entry| async {
151 if let Some(entry) = maybe_entry {
152 let entry_value = entry.into_value();
153 if let Some(client_) = entry_value.upgrade() {
154 tracing::info!("reuse existing kafka client for {}", broker_address);
156 client = Some(client_);
157 return Ok(Op::Nop);
158 }
159 }
160 tracing::info!("build new kafka client for {}", broker_address);
161 client = Some(build_kafka_client(&config, &properties).await?);
162 Ok(Op::Put(Arc::downgrade(client.as_ref().unwrap())))
163 })
164 .await?;
165
166 Ok(Self {
167 context,
168 broker_address,
169 topic,
170 client: client.unwrap(),
171 start_offset: scan_start_offset,
172 stop_offset: KafkaEnumeratorOffset::None,
173 sync_call_timeout: properties.common.sync_call_timeout,
174 high_watermark_metrics: HashMap::new(),
175 properties,
176 config,
177 })
178 }
179
180 async fn list_splits(&mut self) -> ConnectorResult<Vec<KafkaSplit>> {
181 let topic_partitions = self.fetch_topic_partition().await.with_context(|| {
182 format!(
183 "failed to fetch metadata from kafka ({})",
184 self.broker_address
185 )
186 })?;
187
188 let watermarks = self.get_watermarks(topic_partitions.as_ref()).await?;
189 let mut start_offsets = self
190 .fetch_start_offset(topic_partitions.as_ref(), &watermarks)
191 .await?;
192
193 let mut stop_offsets = self
194 .fetch_stop_offset(topic_partitions.as_ref(), &watermarks)
195 .await?;
196
197 let ret: Vec<_> = topic_partitions
198 .into_iter()
199 .map(|partition| KafkaSplit {
200 topic: self.topic.clone(),
201 partition,
202 start_offset: start_offsets.remove(&partition).unwrap(),
203 stop_offset: stop_offsets.remove(&partition).unwrap(),
204 })
205 .collect();
206
207 Ok(ret)
208 }
209
210 async fn on_drop_fragments(&mut self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
211 self.drop_consumer_groups(fragment_ids).await
212 }
213
214 async fn on_finish_backfill(&mut self, fragment_ids: Vec<u32>) -> ConnectorResult<()> {
215 self.drop_consumer_groups(fragment_ids).await
216 }
217}
218
219async fn build_kafka_client(
220 config: &ClientConfig,
221 properties: &KafkaProperties,
222) -> ConnectorResult<Arc<KafkaConsumer>> {
223 let ctx_common = KafkaContextCommon::new(
224 properties.privatelink_common.broker_rewrite_map.clone(),
225 None,
226 None,
227 properties.aws_auth_props.clone(),
228 properties.connection.is_aws_msk_iam(),
229 )
230 .await?;
231 let client_ctx = RwConsumerContext::new(ctx_common);
232 let client: KafkaConsumer = config.create_with_context(client_ctx).await?;
233
234 if properties.connection.is_aws_msk_iam() {
240 #[cfg(not(madsim))]
241 client.poll(Duration::from_secs(10)); #[cfg(madsim)]
243 client.poll(Duration::from_secs(10)).await;
244 }
245 Ok(Arc::new(client))
246}
247async fn build_kafka_admin(
248 config: &ClientConfig,
249 properties: &KafkaProperties,
250) -> ConnectorResult<KafkaAdmin> {
251 let ctx_common = KafkaContextCommon::new(
252 properties.privatelink_common.broker_rewrite_map.clone(),
253 None,
254 None,
255 properties.aws_auth_props.clone(),
256 properties.connection.is_aws_msk_iam(),
257 )
258 .await?;
259 let client_ctx = RwConsumerContext::new(ctx_common);
260 let client: KafkaAdmin = config.create_with_context(client_ctx).await?;
261 Ok(client)
263}
264
265impl KafkaSplitEnumerator {
266 async fn get_watermarks(
267 &mut self,
268 partitions: &[i32],
269 ) -> KafkaResult<HashMap<i32, (i64, i64)>> {
270 let mut map = HashMap::new();
271 for partition in partitions {
272 let (low, high) = self
273 .client
274 .fetch_watermarks(self.topic.as_str(), *partition, self.sync_call_timeout)
275 .await?;
276 self.report_high_watermark(*partition, high);
277 map.insert(*partition, (low, high));
278 }
279 tracing::debug!("fetch kafka watermarks: {map:?}");
280 Ok(map)
281 }
282
283 pub async fn list_splits_batch(
284 &mut self,
285 expect_start_timestamp_millis: Option<i64>,
286 expect_stop_timestamp_millis: Option<i64>,
287 ) -> ConnectorResult<Vec<KafkaSplit>> {
288 let topic_partitions = self.fetch_topic_partition().await.with_context(|| {
289 format!(
290 "failed to fetch metadata from kafka ({})",
291 self.broker_address
292 )
293 })?;
294
295 let mut expect_start_offset = if let Some(ts) = expect_start_timestamp_millis {
300 Some(
301 self.fetch_offset_for_time(topic_partitions.as_ref(), ts)
302 .await?,
303 )
304 } else {
305 None
306 };
307
308 let mut expect_stop_offset = if let Some(ts) = expect_stop_timestamp_millis {
309 Some(
310 self.fetch_offset_for_time(topic_partitions.as_ref(), ts)
311 .await?,
312 )
313 } else {
314 None
315 };
316
317 let mut watermarks = {
320 let mut ret = HashMap::new();
321 for partition in &topic_partitions {
322 let (low, high) = self
323 .client
324 .fetch_watermarks(self.topic.as_str(), *partition, self.sync_call_timeout)
325 .await?;
326 ret.insert(partition, (low - 1, high));
327 }
328 ret
329 };
330
331 Ok(topic_partitions
332 .iter()
333 .map(|partition| {
334 let (low, high) = watermarks.remove(&partition).unwrap();
335 let start_offset = {
336 let start = expect_start_offset
337 .as_mut()
338 .map(|m| m.remove(partition).flatten().map(|t| t-1).unwrap_or(low))
339 .unwrap_or(low);
340 i64::max(start, low)
341 };
342 let stop_offset = {
343 let stop = expect_stop_offset
344 .as_mut()
345 .map(|m| m.remove(partition).unwrap_or(Some(high)))
346 .unwrap_or(Some(high))
347 .unwrap_or(high);
348 i64::min(stop, high)
349 };
350
351 if start_offset > stop_offset {
352 tracing::warn!(
353 "Skipping topic {} partition {}: requested start offset {} is greater than stop offset {}",
354 self.topic,
355 partition,
356 start_offset,
357 stop_offset
358 );
359 }
360 KafkaSplit {
361 topic: self.topic.clone(),
362 partition: *partition,
363 start_offset: Some(start_offset),
364 stop_offset: Some(stop_offset),
365 }
366 })
367 .collect::<Vec<KafkaSplit>>())
368 }
369
370 async fn fetch_stop_offset(
371 &self,
372 partitions: &[i32],
373 watermarks: &HashMap<i32, (i64, i64)>,
374 ) -> KafkaResult<HashMap<i32, Option<i64>>> {
375 match self.stop_offset {
376 KafkaEnumeratorOffset::Earliest => unreachable!(),
377 KafkaEnumeratorOffset::Latest => {
378 let mut map = HashMap::new();
379 for partition in partitions {
380 let (_, high_watermark) = watermarks.get(partition).unwrap();
381 map.insert(*partition, Some(*high_watermark));
382 }
383 Ok(map)
384 }
385 KafkaEnumeratorOffset::Timestamp(time) => {
386 self.fetch_offset_for_time(partitions, time).await
387 }
388 KafkaEnumeratorOffset::None => partitions
389 .iter()
390 .map(|partition| Ok((*partition, None)))
391 .collect(),
392 }
393 }
394
395 async fn fetch_start_offset(
396 &self,
397 partitions: &[i32],
398 watermarks: &HashMap<i32, (i64, i64)>,
399 ) -> KafkaResult<HashMap<i32, Option<i64>>> {
400 match self.start_offset {
401 KafkaEnumeratorOffset::Earliest | KafkaEnumeratorOffset::Latest => {
402 let mut map = HashMap::new();
403 for partition in partitions {
404 let (low_watermark, high_watermark) = watermarks.get(partition).unwrap();
405 let offset = match self.start_offset {
406 KafkaEnumeratorOffset::Earliest => low_watermark - 1,
407 KafkaEnumeratorOffset::Latest => high_watermark - 1,
408 _ => unreachable!(),
409 };
410 map.insert(*partition, Some(offset));
411 }
412 Ok(map)
413 }
414 KafkaEnumeratorOffset::Timestamp(time) => {
415 self.fetch_offset_for_time(partitions, time).await
416 }
417 KafkaEnumeratorOffset::None => partitions
418 .iter()
419 .map(|partition| Ok((*partition, None)))
420 .collect(),
421 }
422 }
423
424 async fn fetch_offset_for_time(
425 &self,
426 partitions: &[i32],
427 time: i64,
428 ) -> KafkaResult<HashMap<i32, Option<i64>>> {
429 let mut tpl = TopicPartitionList::new();
430
431 for partition in partitions {
432 tpl.add_partition_offset(self.topic.as_str(), *partition, Offset::Offset(time))?;
433 }
434
435 let offsets = self
436 .client
437 .offsets_for_times(tpl, self.sync_call_timeout)
438 .await?;
439
440 let mut result = HashMap::with_capacity(partitions.len());
441
442 for elem in offsets.elements_for_topic(self.topic.as_str()) {
443 match elem.offset() {
444 Offset::Offset(offset) => {
445 result.insert(elem.partition(), Some(offset - 1));
447 }
448 _ => {
449 let (_, high_watermark) = self
450 .client
451 .fetch_watermarks(
452 self.topic.as_str(),
453 elem.partition(),
454 self.sync_call_timeout,
455 )
456 .await?;
457 result.insert(elem.partition(), Some(high_watermark));
458 }
459 }
460 }
461
462 Ok(result)
463 }
464
465 #[inline]
466 fn report_high_watermark(&mut self, partition: i32, offset: i64) {
467 let high_watermark_metrics =
468 self.high_watermark_metrics
469 .entry(partition)
470 .or_insert_with(|| {
471 self.context
472 .metrics
473 .high_watermark
474 .with_guarded_label_values(&[
475 &self.context.info.source_id.to_string(),
476 &partition.to_string(),
477 ])
478 });
479 high_watermark_metrics.set(offset);
480 }
481
482 pub async fn check_reachability(&self) -> bool {
483 self.client
484 .fetch_metadata(Some(self.topic.as_str()), self.sync_call_timeout)
485 .await
486 .is_ok()
487 }
488
489 async fn fetch_topic_partition(&self) -> ConnectorResult<Vec<i32>> {
490 let metadata = self
492 .client
493 .fetch_metadata(Some(self.topic.as_str()), self.sync_call_timeout)
494 .await?;
495
496 let topic_meta = match metadata.topics() {
497 [meta] => meta,
498 _ => bail!("topic {} not found", self.topic),
499 };
500
501 if topic_meta.partitions().is_empty() {
502 bail!("topic {} not found", self.topic);
503 }
504
505 Ok(topic_meta
506 .partitions()
507 .iter()
508 .map(|partition| partition.id())
509 .collect())
510 }
511}