1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::swap;
18use std::time::Duration;
19
20use anyhow::Context;
21use async_trait::async_trait;
22use futures::{StreamExt, TryStreamExt};
23use futures_async_stream::try_stream;
24use rdkafka::consumer::{Consumer, StreamConsumer};
25use rdkafka::error::KafkaError;
26use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList};
27use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
28use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
29
30use crate::connector_common::read_kafka_log_level;
31use crate::error::ConnectorResult as Result;
32use crate::parser::ParserConfig;
33use crate::source::base::SourceMessage;
34use crate::source::kafka::{
35 KAFKA_ISOLATION_LEVEL, KafkaContextCommon, KafkaProperties, KafkaSplit, RwConsumerContext,
36};
37use crate::source::{
38 BackfillInfo, BoxSourceChunkStream, BoxSourceReaderEventStream, Column, SourceContextRef,
39 SourceMessageEvent, SplitId, SplitImpl, SplitMetaData, SplitReader, into_chunk_event_stream,
40 into_chunk_stream,
41};
42
43pub struct KafkaSplitReader {
44 consumer: StreamConsumer<RwConsumerContext>,
45 offsets: HashMap<SplitId, (Option<i64>, Option<i64>)>,
46 backfill_info: HashMap<SplitId, BackfillInfo>,
47 known_eof_offsets: HashMap<SplitId, i64>,
50 splits: Vec<KafkaSplit>,
51 sync_call_timeout: Duration,
52 bytes_per_second: usize,
53 max_num_messages: usize,
54 parser_config: ParserConfig,
55 source_ctx: SourceContextRef,
56}
57
58#[async_trait]
59impl SplitReader for KafkaSplitReader {
60 type Properties = KafkaProperties;
61 type Split = KafkaSplit;
62
63 async fn new(
64 properties: KafkaProperties,
65 splits: Vec<KafkaSplit>,
66 parser_config: ParserConfig,
67 source_ctx: SourceContextRef,
68 _columns: Option<Vec<Column>>,
69 ) -> Result<Self> {
70 let mut config = ClientConfig::new();
71
72 let bootstrap_servers = &properties.connection.brokers;
73 let broker_rewrite_map = properties.privatelink_common.broker_rewrite_map.clone();
74
75 config.set("enable.partition.eof", "true");
78 config.set("auto.offset.reset", "smallest");
79 config.set("isolation.level", KAFKA_ISOLATION_LEVEL);
80 config.set("bootstrap.servers", bootstrap_servers);
81
82 properties.connection.set_security_properties(&mut config);
83 properties.set_client(&mut config);
84
85 config.set("group.id", properties.group_id(source_ctx.fragment_id));
86
87 let ctx_common = KafkaContextCommon::new(
88 broker_rewrite_map,
89 Some(format!(
90 "fragment-{}-source-{}-actor-{}",
91 source_ctx.fragment_id, source_ctx.source_id, source_ctx.actor_id
92 )),
93 Some(source_ctx.metrics.rdkafka_native_metric.clone()),
96 properties.aws_auth_props,
97 properties.connection.is_aws_msk_iam(),
98 )
99 .await?;
100
101 let client_ctx = RwConsumerContext::new(ctx_common);
102
103 if let Some(log_level) = read_kafka_log_level() {
104 config.set_log_level(log_level);
105 }
106 let consumer: StreamConsumer<RwConsumerContext> = config
107 .create_with_context(client_ctx)
108 .await
109 .context("failed to create kafka consumer")?;
110
111 let mut tpl = TopicPartitionList::with_capacity(splits.len());
112
113 let mut offsets = HashMap::new();
114 let mut backfill_info = HashMap::new();
115 let mut known_eof_offsets = HashMap::new();
116 for split in splits.clone() {
117 offsets.insert(split.id(), (split.start_offset, split.stop_offset));
118
119 if let Some(offset) = split.start_offset {
120 tpl.add_partition_offset(
121 split.topic.as_str(),
122 split.partition,
123 Offset::Offset(offset + 1),
124 )?;
125 } else {
126 tpl.add_partition(split.topic.as_str(), split.partition);
127 }
128
129 let (low, high) = consumer
130 .fetch_watermarks(
131 split.topic.as_str(),
132 split.partition,
133 properties.common.sync_call_timeout,
134 )
135 .await?;
136 tracing::info!("fetch kafka watermarks: low: {low}, high: {high}, split: {split:?}");
137 let has_data_to_backfill =
139 low != high && split.start_offset.is_none_or(|offset| offset + 1 < high);
140 let watermark_offset = if has_data_to_backfill {
141 debug_assert!(high > 0);
142 Some(high - 1)
143 } else {
144 None
145 };
146 if !has_data_to_backfill {
147 backfill_info.insert(split.id(), BackfillInfo::NoDataToBackfill);
148 } else {
149 backfill_info.insert(
150 split.id(),
151 BackfillInfo::HasDataToBackfill {
152 latest_offset: (high - 1).to_string(),
153 },
154 );
155 }
156
157 let stop_offset = split.stop_offset.and_then(Self::offset_before);
158 if let Some(known_eof_offset) = Self::max_known_offset(watermark_offset, stop_offset) {
159 known_eof_offsets.insert(split.id(), known_eof_offset);
160 }
161 }
162 tracing::info!(
163 topic = properties.common.topic,
164 source_name = source_ctx.source_name,
165 fragment_id = %source_ctx.fragment_id,
166 source_id = %source_ctx.source_id,
167 actor_id = %source_ctx.actor_id,
168 "backfill_info: {:?}",
169 backfill_info
170 );
171
172 consumer.assign(&tpl)?;
173
174 let bytes_per_second = match properties.bytes_per_second {
177 None => usize::MAX,
178 Some(number) => number
179 .parse::<usize>()
180 .expect("bytes.per.second expect usize"),
181 };
182 let max_num_messages = match properties.max_num_messages {
183 None => usize::MAX,
184 Some(number) => number
185 .parse::<usize>()
186 .expect("max.num.messages expect usize"),
187 };
188
189 Ok(Self {
190 consumer,
191 offsets,
192 splits,
193 backfill_info,
194 known_eof_offsets,
195 bytes_per_second,
196 sync_call_timeout: properties.common.sync_call_timeout,
197 max_num_messages,
198 parser_config,
199 source_ctx,
200 })
201 }
202
203 fn into_stream(self) -> BoxSourceChunkStream {
204 let parser_config = self.parser_config.clone();
205 let source_context = self.source_ctx.clone();
206 let data_stream = self
207 .into_data_event_stream()
208 .try_filter_map(|event| async move {
209 Ok(match event {
210 SourceMessageEvent::Data(batch) => Some(batch),
211 SourceMessageEvent::SplitProgress(_) => None,
212 })
213 });
214 into_chunk_stream(data_stream, parser_config, source_context)
215 }
216
217 fn into_event_stream(self) -> BoxSourceReaderEventStream {
218 let parser_config = self.parser_config.clone();
219 let source_context = self.source_ctx.clone();
220 into_chunk_event_stream(self.into_data_event_stream(), parser_config, source_context)
221 }
222
223 fn backfill_info(&self) -> HashMap<SplitId, BackfillInfo> {
224 self.backfill_info.clone()
225 }
226
227 async fn seek_to_latest(&mut self) -> Result<Vec<SplitImpl>> {
228 let mut latest_splits: Vec<SplitImpl> = Vec::new();
229 let mut tpl = TopicPartitionList::with_capacity(self.splits.len());
230 for mut split in self.splits.clone() {
231 let (_low, high) = self
233 .consumer
234 .fetch_watermarks(
235 split.topic.as_str(),
236 split.partition,
237 self.sync_call_timeout,
238 )
239 .await?;
240 tpl.add_partition_offset(split.topic.as_str(), split.partition, Offset::Offset(high))?;
241 split.start_offset = Some(high - 1);
242 latest_splits.push(split.into());
243 }
244 self.consumer.assign(&tpl)?;
246 Ok(latest_splits)
247 }
248}
249
250impl KafkaSplitReader {
251 fn offset_before(next_offset: i64) -> Option<i64> {
252 next_offset.checked_sub(1).filter(|offset| *offset >= 0)
253 }
254
255 fn max_known_offset(current_offset: Option<i64>, candidate_offset: Option<i64>) -> Option<i64> {
256 match (current_offset, candidate_offset) {
257 (Some(current_offset), Some(candidate_offset)) => {
258 Some(current_offset.max(candidate_offset))
259 }
260 (Some(current_offset), None) => Some(current_offset),
261 (None, Some(candidate_offset)) => Some(candidate_offset),
262 (None, None) => None,
263 }
264 }
265
266 fn record_split_progress(
267 progress: &mut HashMap<SplitId, String>,
268 latest_progress_offsets: &mut HashMap<SplitId, i64>,
269 split_id: &SplitId,
270 inclusive_offset: i64,
271 ) {
272 let should_emit = latest_progress_offsets
273 .get(split_id)
274 .is_none_or(|offset| *offset < inclusive_offset);
275 if should_emit {
276 latest_progress_offsets.insert(split_id.clone(), inclusive_offset);
277 progress.insert(split_id.clone(), inclusive_offset.to_string());
278 }
279 }
280
281 fn drain_stop_offsets_with_progress(
282 stop_offsets: &mut HashMap<SplitId, i64>,
283 progress: &HashMap<SplitId, String>,
284 ) {
285 let mut finished_splits = Vec::new();
286
287 for (split_id, progress_offset) in progress {
288 let Some(stop_offset) = stop_offsets.get(split_id).copied() else {
289 continue;
290 };
291 let Ok(progress_offset) = progress_offset.parse::<i64>() else {
292 tracing::warn!(
293 split_id = split_id.as_ref(),
294 progress_offset = progress_offset.as_str(),
295 "invalid split progress offset from kafka reader"
296 );
297 continue;
298 };
299
300 if progress_offset >= stop_offset - 1 {
302 tracing::debug!(
303 split_id = split_id.as_ref(),
304 stop_offset,
305 progress_offset,
306 "stop offset reached by split progress"
307 );
308 finished_splits.push(split_id.clone());
309 }
310 }
311
312 for split_id in finished_splits {
313 stop_offsets.remove(&split_id);
314 }
315 }
316
317 fn snapshot_split_progress(
318 eof_offsets: &HashMap<SplitId, i64>,
319 latest_progress_offsets: &mut HashMap<SplitId, i64>,
320 ) -> Option<HashMap<SplitId, String>> {
321 let mut progress = HashMap::new();
322
323 for (split_id, inclusive_offset) in eof_offsets {
324 Self::record_split_progress(
325 &mut progress,
326 latest_progress_offsets,
327 split_id,
328 *inclusive_offset,
329 );
330 }
331
332 (!progress.is_empty()).then_some(progress)
333 }
334
335 fn resolve_eof_offsets(&self, eof_partitions: &HashSet<i32>) -> Result<HashMap<SplitId, i64>> {
336 let positions = self.consumer.position()?;
337 let mut eof_offsets = HashMap::new();
338
339 for split in &self.splits {
340 if !eof_partitions.contains(&split.partition) {
341 continue;
342 }
343
344 let split_id = split.id();
345 let position_offset = positions
346 .find_partition(split.topic.as_str(), split.partition)
347 .and_then(|position| match position.offset() {
348 Offset::Offset(next_offset) => Self::offset_before(next_offset),
349 _ => None,
350 });
351 let Some(inclusive_offset) = Self::max_known_offset(
352 position_offset,
353 self.known_eof_offsets.get(&split_id).copied(),
354 ) else {
355 continue;
356 };
357 eof_offsets.insert(split_id, inclusive_offset);
358 }
359
360 Ok(eof_offsets)
361 }
362
363 fn apply_split_progress(
364 stop_offsets: &mut HashMap<SplitId, i64>,
365 split_progress: Option<HashMap<SplitId, String>>,
366 is_bounded: bool,
367 ) -> Option<HashMap<SplitId, String>> {
368 let progress = split_progress?;
369 if is_bounded {
370 Self::drain_stop_offsets_with_progress(stop_offsets, &progress);
371 }
372 Some(progress)
373 }
374
375 #[try_stream(ok = SourceMessageEvent, error = crate::error::ConnectorError)]
376 async fn into_data_event_stream(self) {
377 if self.offsets.values().all(|(start_offset, stop_offset)| {
378 match (start_offset, stop_offset) {
379 (Some(start), Some(stop)) if (*start + 1) >= *stop => true,
380 (_, Some(stop)) if *stop == 0 => true,
381 _ => false,
382 }
383 }) {
384 yield SourceMessageEvent::Data(Vec::new());
385 return Ok(());
386 };
387
388 let mut stop_offsets: HashMap<_, _> = self
389 .offsets
390 .iter()
391 .flat_map(|(split_id, (_, stop_offset))| {
392 stop_offset
393 .filter(|offset| *offset > 0)
394 .map(|offset| (split_id.clone() as SplitId, offset))
395 })
396 .collect();
397 let is_bounded = !stop_offsets.is_empty();
398
399 let mut interval = tokio::time::interval(Duration::from_secs(1));
400 interval.tick().await;
401 let mut bytes_current_second = 0;
402 let mut num_messages = 0;
403 let max_chunk_size = self.source_ctx.source_ctrl_opts.chunk_size;
404 let mut res = Vec::with_capacity(max_chunk_size);
405 let require_message_header = self.parser_config.common.rw_columns.iter().any(|col_desc| {
407 matches!(
408 col_desc.additional_column.column_type,
409 Some(AdditionalColumnType::Headers(_) | AdditionalColumnType::HeaderInner(_))
410 )
411 });
412
413 let mut latest_message_id_metrics: HashMap<String, LabelGuardedIntGauge> = HashMap::new();
414 let mut partition_eof_count_metrics: HashMap<String, LabelGuardedIntCounter> =
415 HashMap::new();
416 let mut partition_eof_offset_metrics: HashMap<String, LabelGuardedIntGauge> =
417 HashMap::new();
418 let mut latest_progress_offsets = self
419 .splits
420 .iter()
421 .filter_map(|split| split.start_offset.map(|offset| (split.id(), offset)))
422 .collect::<HashMap<_, _>>();
423
424 #[for_await]
425 'for_outer_loop: for raw_msgs in self.consumer.stream().ready_chunks(max_chunk_size) {
426 let mut eof_partitions = HashSet::new();
427 let mut msgs = Vec::with_capacity(max_chunk_size);
428 for msg in raw_msgs {
429 match msg {
430 Ok(msg) => msgs.push(msg),
431 Err(KafkaError::PartitionEOF(partition)) => {
432 let split_id = partition.to_string();
433 partition_eof_count_metrics
434 .entry(split_id.clone())
435 .or_insert_with(|| {
436 self.source_ctx
437 .metrics
438 .partition_eof_count
439 .with_guarded_label_values(&[
440 &self.source_ctx.source_id.to_string(),
441 &split_id,
442 &self.source_ctx.source_name,
443 &self.source_ctx.fragment_id.to_string(),
444 ])
445 })
446 .inc();
447 eof_partitions.insert(partition);
448 }
449 Err(err) => return Err(err.into()),
450 }
451 }
452
453 let eof_offsets = if eof_partitions.is_empty() {
454 HashMap::new()
455 } else {
456 self.resolve_eof_offsets(&eof_partitions)?
457 };
458 for (split_id, eof_offset) in &eof_offsets {
459 let split_id_label = split_id.as_ref().to_owned();
460 partition_eof_offset_metrics
461 .entry(split_id_label.clone())
462 .or_insert_with(|| {
463 self.source_ctx
464 .metrics
465 .partition_eof_offset
466 .with_guarded_label_values(&[
467 &self.source_ctx.source_id.to_string(),
468 &split_id_label,
469 &self.source_ctx.source_name,
470 &self.source_ctx.fragment_id.to_string(),
471 ])
472 })
473 .set(*eof_offset);
474 tracing::info!(
475 actor_id = %self.source_ctx.actor_id,
476 source_id = %self.source_ctx.source_id,
477 source_name = self.source_ctx.source_name,
478 fragment_id = %self.source_ctx.fragment_id,
479 split_id = %split_id,
480 eof_offset,
481 "received kafka partition EOF"
482 );
483 }
484 let split_progress =
485 Self::snapshot_split_progress(&eof_offsets, &mut latest_progress_offsets);
486 let split_progress =
487 Self::apply_split_progress(&mut stop_offsets, split_progress, is_bounded);
488
489 if msgs.is_empty() {
490 if let Some(progress) = split_progress {
491 yield SourceMessageEvent::SplitProgress(progress);
492 }
493 if is_bounded && stop_offsets.is_empty() {
494 if !res.is_empty() {
495 yield SourceMessageEvent::Data(res);
496 }
497 break 'for_outer_loop;
498 }
499 continue;
500 }
501
502 let mut split_msg_offsets = HashMap::new();
503
504 for msg in &msgs {
505 split_msg_offsets.insert(msg.partition(), msg.offset());
506 }
507
508 for (partition, offset) in split_msg_offsets {
509 let split_id = partition.to_string();
510 latest_message_id_metrics
511 .entry(split_id.clone())
512 .or_insert_with(|| {
513 self.source_ctx
514 .metrics
515 .latest_message_id
516 .with_guarded_label_values(&[
517 &self.source_ctx.source_id.to_string(),
519 &self.source_ctx.actor_id.to_string(),
520 &split_id,
521 ])
522 })
523 .set(offset);
524 }
525
526 for msg in msgs {
527 let cur_offset = msg.offset();
528 bytes_current_second += match &msg.payload() {
529 None => 0,
530 Some(payload) => payload.len(),
531 };
532 num_messages += 1;
533 let source_message =
534 SourceMessage::from_kafka_message(&msg, require_message_header);
535 let split_id = source_message.split_id.clone();
536 res.push(source_message);
537
538 if let Entry::Occupied(o) = stop_offsets.entry(split_id) {
539 let stop_offset = *o.get();
540
541 if cur_offset == stop_offset - 1 {
542 tracing::debug!(
543 "stop offset reached for split {}, stop reading, offset: {}, stop offset: {}",
544 o.key(),
545 cur_offset,
546 stop_offset
547 );
548
549 o.remove();
550
551 if stop_offsets.is_empty() {
552 yield SourceMessageEvent::Data(res);
553 break 'for_outer_loop;
554 }
555
556 continue;
557 }
558 }
559
560 if bytes_current_second > self.bytes_per_second {
561 let mut cur = Vec::with_capacity(res.capacity());
563 swap(&mut cur, &mut res);
564 yield SourceMessageEvent::Data(cur);
565 interval.tick().await;
566 bytes_current_second = 0;
567 res.clear();
568 }
569 if num_messages >= self.max_num_messages {
570 yield SourceMessageEvent::Data(res);
571 break 'for_outer_loop;
572 }
573 }
574 let mut cur = Vec::with_capacity(res.capacity());
575 swap(&mut cur, &mut res);
576 yield SourceMessageEvent::Data(cur);
577 if let Some(progress) = split_progress {
578 yield SourceMessageEvent::SplitProgress(progress);
579 }
580 if is_bounded && stop_offsets.is_empty() {
581 break 'for_outer_loop;
582 }
583 }
587 tracing::info!("kafka reader finished");
588 }
589}
590
591#[cfg(test)]
592mod tests {
593 use std::collections::HashMap;
594
595 use super::KafkaSplitReader;
596 use crate::source::SplitId;
597
598 #[test]
599 fn test_drain_stop_offsets_with_progress() {
600 let mut stop_offsets = HashMap::from([
601 (SplitId::from("0"), 10_i64),
602 (SplitId::from("1"), 15_i64),
603 (SplitId::from("2"), 20_i64),
604 ]);
605 let progress = HashMap::from([
606 (SplitId::from("0"), "9".to_owned()),
607 (SplitId::from("1"), "13".to_owned()),
608 (SplitId::from("2"), "invalid".to_owned()),
609 ]);
610
611 KafkaSplitReader::drain_stop_offsets_with_progress(&mut stop_offsets, &progress);
612
613 assert!(!stop_offsets.contains_key("0"));
614 assert_eq!(stop_offsets.get("1"), Some(&15));
615 assert_eq!(stop_offsets.get("2"), Some(&20));
616 }
617
618 #[test]
619 fn test_drain_stop_offsets_with_progress_for_zero_stop_offset() {
620 let mut stop_offsets = HashMap::from([(SplitId::from("0"), 0_i64)]);
621 let progress = HashMap::from([(SplitId::from("0"), "0".to_owned())]);
622
623 KafkaSplitReader::drain_stop_offsets_with_progress(&mut stop_offsets, &progress);
624
625 assert!(stop_offsets.is_empty());
626 }
627
628 #[test]
629 fn test_max_known_offset_prefers_larger_offset() {
630 assert_eq!(
631 KafkaSplitReader::max_known_offset(Some(3), Some(4)),
632 Some(4)
633 );
634 assert_eq!(
635 KafkaSplitReader::max_known_offset(Some(7), Some(4)),
636 Some(7)
637 );
638 assert_eq!(KafkaSplitReader::max_known_offset(None, Some(4)), Some(4));
639 assert_eq!(KafkaSplitReader::max_known_offset(Some(4), None), Some(4));
640 }
641
642 #[test]
643 fn test_apply_split_progress_for_empty_message_batch() {
644 let mut stop_offsets = HashMap::from([(SplitId::from("0"), 5_i64)]);
645 let split_progress = Some(HashMap::from([(SplitId::from("0"), "4".to_owned())]));
646
647 let applied =
648 KafkaSplitReader::apply_split_progress(&mut stop_offsets, split_progress, true);
649
650 assert_eq!(
651 applied,
652 Some(HashMap::from([(SplitId::from("0"), "4".to_owned())]))
653 );
654 assert!(stop_offsets.is_empty());
655 }
656}