risingwave_connector/source/kafka/source/
reader.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::swap;
18use std::time::Duration;
19
20use anyhow::Context;
21use async_trait::async_trait;
22use futures::{StreamExt, TryStreamExt};
23use futures_async_stream::try_stream;
24use rdkafka::consumer::{Consumer, StreamConsumer};
25use rdkafka::error::KafkaError;
26use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList};
27use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
28use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
29
30use crate::connector_common::read_kafka_log_level;
31use crate::error::ConnectorResult as Result;
32use crate::parser::ParserConfig;
33use crate::source::base::SourceMessage;
34use crate::source::kafka::{
35    KAFKA_ISOLATION_LEVEL, KafkaContextCommon, KafkaProperties, KafkaSplit, RwConsumerContext,
36};
37use crate::source::{
38    BackfillInfo, BoxSourceChunkStream, BoxSourceReaderEventStream, Column, SourceContextRef,
39    SourceMessageEvent, SplitId, SplitImpl, SplitMetaData, SplitReader, into_chunk_event_stream,
40    into_chunk_stream,
41};
42
43pub struct KafkaSplitReader {
44    consumer: StreamConsumer<RwConsumerContext>,
45    offsets: HashMap<SplitId, (Option<i64>, Option<i64>)>,
46    backfill_info: HashMap<SplitId, BackfillInfo>,
47    /// The furthest inclusive offset that is known to exist when the reader observes EOF for a
48    /// split, even if that offset does not correspond to a visible data record.
49    known_eof_offsets: HashMap<SplitId, i64>,
50    splits: Vec<KafkaSplit>,
51    sync_call_timeout: Duration,
52    bytes_per_second: usize,
53    max_num_messages: usize,
54    parser_config: ParserConfig,
55    source_ctx: SourceContextRef,
56}
57
58#[async_trait]
59impl SplitReader for KafkaSplitReader {
60    type Properties = KafkaProperties;
61    type Split = KafkaSplit;
62
63    async fn new(
64        properties: KafkaProperties,
65        splits: Vec<KafkaSplit>,
66        parser_config: ParserConfig,
67        source_ctx: SourceContextRef,
68        _columns: Option<Vec<Column>>,
69    ) -> Result<Self> {
70        let mut config = ClientConfig::new();
71
72        let bootstrap_servers = &properties.connection.brokers;
73        let broker_rewrite_map = properties.privatelink_common.broker_rewrite_map.clone();
74
75        // Enable partition EOF to emit split progress updates when the fetched data ends with
76        // non-deliverable records, e.g. transactional control records in read-committed mode.
77        config.set("enable.partition.eof", "true");
78        config.set("auto.offset.reset", "smallest");
79        config.set("isolation.level", KAFKA_ISOLATION_LEVEL);
80        config.set("bootstrap.servers", bootstrap_servers);
81
82        properties.connection.set_security_properties(&mut config);
83        properties.set_client(&mut config);
84
85        config.set("group.id", properties.group_id(source_ctx.fragment_id));
86
87        let ctx_common = KafkaContextCommon::new(
88            broker_rewrite_map,
89            Some(format!(
90                "fragment-{}-source-{}-actor-{}",
91                source_ctx.fragment_id, source_ctx.source_id, source_ctx.actor_id
92            )),
93            // thread consumer will keep polling in the background, we don't need to call `poll`
94            // explicitly
95            Some(source_ctx.metrics.rdkafka_native_metric.clone()),
96            properties.aws_auth_props,
97            properties.connection.is_aws_msk_iam(),
98        )
99        .await?;
100
101        let client_ctx = RwConsumerContext::new(ctx_common);
102
103        if let Some(log_level) = read_kafka_log_level() {
104            config.set_log_level(log_level);
105        }
106        let consumer: StreamConsumer<RwConsumerContext> = config
107            .create_with_context(client_ctx)
108            .await
109            .context("failed to create kafka consumer")?;
110
111        let mut tpl = TopicPartitionList::with_capacity(splits.len());
112
113        let mut offsets = HashMap::new();
114        let mut backfill_info = HashMap::new();
115        let mut known_eof_offsets = HashMap::new();
116        for split in splits.clone() {
117            offsets.insert(split.id(), (split.start_offset, split.stop_offset));
118
119            if let Some(offset) = split.start_offset {
120                tpl.add_partition_offset(
121                    split.topic.as_str(),
122                    split.partition,
123                    Offset::Offset(offset + 1),
124                )?;
125            } else {
126                tpl.add_partition(split.topic.as_str(), split.partition);
127            }
128
129            let (low, high) = consumer
130                .fetch_watermarks(
131                    split.topic.as_str(),
132                    split.partition,
133                    properties.common.sync_call_timeout,
134                )
135                .await?;
136            tracing::info!("fetch kafka watermarks: low: {low}, high: {high}, split: {split:?}");
137            // note: low is inclusive, high is exclusive, start_offset is exclusive
138            let has_data_to_backfill =
139                low != high && split.start_offset.is_none_or(|offset| offset + 1 < high);
140            let watermark_offset = if has_data_to_backfill {
141                debug_assert!(high > 0);
142                Some(high - 1)
143            } else {
144                None
145            };
146            if !has_data_to_backfill {
147                backfill_info.insert(split.id(), BackfillInfo::NoDataToBackfill);
148            } else {
149                backfill_info.insert(
150                    split.id(),
151                    BackfillInfo::HasDataToBackfill {
152                        latest_offset: (high - 1).to_string(),
153                    },
154                );
155            }
156
157            let stop_offset = split.stop_offset.and_then(Self::offset_before);
158            if let Some(known_eof_offset) = Self::max_known_offset(watermark_offset, stop_offset) {
159                known_eof_offsets.insert(split.id(), known_eof_offset);
160            }
161        }
162        tracing::info!(
163            topic = properties.common.topic,
164            source_name = source_ctx.source_name,
165            fragment_id = %source_ctx.fragment_id,
166            source_id = %source_ctx.source_id,
167            actor_id = %source_ctx.actor_id,
168            "backfill_info: {:?}",
169            backfill_info
170        );
171
172        consumer.assign(&tpl)?;
173
174        // The two parameters below are only used by developers for performance testing purposes,
175        // so we panic here on purpose if the input is not correctly recognized.
176        let bytes_per_second = match properties.bytes_per_second {
177            None => usize::MAX,
178            Some(number) => number
179                .parse::<usize>()
180                .expect("bytes.per.second expect usize"),
181        };
182        let max_num_messages = match properties.max_num_messages {
183            None => usize::MAX,
184            Some(number) => number
185                .parse::<usize>()
186                .expect("max.num.messages expect usize"),
187        };
188
189        Ok(Self {
190            consumer,
191            offsets,
192            splits,
193            backfill_info,
194            known_eof_offsets,
195            bytes_per_second,
196            sync_call_timeout: properties.common.sync_call_timeout,
197            max_num_messages,
198            parser_config,
199            source_ctx,
200        })
201    }
202
203    fn into_stream(self) -> BoxSourceChunkStream {
204        let parser_config = self.parser_config.clone();
205        let source_context = self.source_ctx.clone();
206        let data_stream = self
207            .into_data_event_stream()
208            .try_filter_map(|event| async move {
209                Ok(match event {
210                    SourceMessageEvent::Data(batch) => Some(batch),
211                    SourceMessageEvent::SplitProgress(_) => None,
212                })
213            });
214        into_chunk_stream(data_stream, parser_config, source_context)
215    }
216
217    fn into_event_stream(self) -> BoxSourceReaderEventStream {
218        let parser_config = self.parser_config.clone();
219        let source_context = self.source_ctx.clone();
220        into_chunk_event_stream(self.into_data_event_stream(), parser_config, source_context)
221    }
222
223    fn backfill_info(&self) -> HashMap<SplitId, BackfillInfo> {
224        self.backfill_info.clone()
225    }
226
227    async fn seek_to_latest(&mut self) -> Result<Vec<SplitImpl>> {
228        let mut latest_splits: Vec<SplitImpl> = Vec::new();
229        let mut tpl = TopicPartitionList::with_capacity(self.splits.len());
230        for mut split in self.splits.clone() {
231            // we can't get latest offset if we use Offset::End, so we just fetch watermark here.
232            let (_low, high) = self
233                .consumer
234                .fetch_watermarks(
235                    split.topic.as_str(),
236                    split.partition,
237                    self.sync_call_timeout,
238                )
239                .await?;
240            tpl.add_partition_offset(split.topic.as_str(), split.partition, Offset::Offset(high))?;
241            split.start_offset = Some(high - 1);
242            latest_splits.push(split.into());
243        }
244        // replace the previous assignment
245        self.consumer.assign(&tpl)?;
246        Ok(latest_splits)
247    }
248}
249
250impl KafkaSplitReader {
251    fn offset_before(next_offset: i64) -> Option<i64> {
252        next_offset.checked_sub(1).filter(|offset| *offset >= 0)
253    }
254
255    fn max_known_offset(current_offset: Option<i64>, candidate_offset: Option<i64>) -> Option<i64> {
256        match (current_offset, candidate_offset) {
257            (Some(current_offset), Some(candidate_offset)) => {
258                Some(current_offset.max(candidate_offset))
259            }
260            (Some(current_offset), None) => Some(current_offset),
261            (None, Some(candidate_offset)) => Some(candidate_offset),
262            (None, None) => None,
263        }
264    }
265
266    fn record_split_progress(
267        progress: &mut HashMap<SplitId, String>,
268        latest_progress_offsets: &mut HashMap<SplitId, i64>,
269        split_id: &SplitId,
270        inclusive_offset: i64,
271    ) {
272        let should_emit = latest_progress_offsets
273            .get(split_id)
274            .is_none_or(|offset| *offset < inclusive_offset);
275        if should_emit {
276            latest_progress_offsets.insert(split_id.clone(), inclusive_offset);
277            progress.insert(split_id.clone(), inclusive_offset.to_string());
278        }
279    }
280
281    fn drain_stop_offsets_with_progress(
282        stop_offsets: &mut HashMap<SplitId, i64>,
283        progress: &HashMap<SplitId, String>,
284    ) {
285        let mut finished_splits = Vec::new();
286
287        for (split_id, progress_offset) in progress {
288            let Some(stop_offset) = stop_offsets.get(split_id).copied() else {
289                continue;
290            };
291            let Ok(progress_offset) = progress_offset.parse::<i64>() else {
292                tracing::warn!(
293                    split_id = split_id.as_ref(),
294                    progress_offset = progress_offset.as_str(),
295                    "invalid split progress offset from kafka reader"
296                );
297                continue;
298            };
299
300            // `stop_offset` is exclusive while progress is inclusive.
301            if progress_offset >= stop_offset - 1 {
302                tracing::debug!(
303                    split_id = split_id.as_ref(),
304                    stop_offset,
305                    progress_offset,
306                    "stop offset reached by split progress"
307                );
308                finished_splits.push(split_id.clone());
309            }
310        }
311
312        for split_id in finished_splits {
313            stop_offsets.remove(&split_id);
314        }
315    }
316
317    fn snapshot_split_progress(
318        eof_offsets: &HashMap<SplitId, i64>,
319        latest_progress_offsets: &mut HashMap<SplitId, i64>,
320    ) -> Option<HashMap<SplitId, String>> {
321        let mut progress = HashMap::new();
322
323        for (split_id, inclusive_offset) in eof_offsets {
324            Self::record_split_progress(
325                &mut progress,
326                latest_progress_offsets,
327                split_id,
328                *inclusive_offset,
329            );
330        }
331
332        (!progress.is_empty()).then_some(progress)
333    }
334
335    fn resolve_eof_offsets(&self, eof_partitions: &HashSet<i32>) -> Result<HashMap<SplitId, i64>> {
336        let positions = self.consumer.position()?;
337        let mut eof_offsets = HashMap::new();
338
339        for split in &self.splits {
340            if !eof_partitions.contains(&split.partition) {
341                continue;
342            }
343
344            let split_id = split.id();
345            let position_offset = positions
346                .find_partition(split.topic.as_str(), split.partition)
347                .and_then(|position| match position.offset() {
348                    Offset::Offset(next_offset) => Self::offset_before(next_offset),
349                    _ => None,
350                });
351            let Some(inclusive_offset) = Self::max_known_offset(
352                position_offset,
353                self.known_eof_offsets.get(&split_id).copied(),
354            ) else {
355                continue;
356            };
357            eof_offsets.insert(split_id, inclusive_offset);
358        }
359
360        Ok(eof_offsets)
361    }
362
363    fn apply_split_progress(
364        stop_offsets: &mut HashMap<SplitId, i64>,
365        split_progress: Option<HashMap<SplitId, String>>,
366        is_bounded: bool,
367    ) -> Option<HashMap<SplitId, String>> {
368        let progress = split_progress?;
369        if is_bounded {
370            Self::drain_stop_offsets_with_progress(stop_offsets, &progress);
371        }
372        Some(progress)
373    }
374
375    #[try_stream(ok = SourceMessageEvent, error = crate::error::ConnectorError)]
376    async fn into_data_event_stream(self) {
377        if self.offsets.values().all(|(start_offset, stop_offset)| {
378            match (start_offset, stop_offset) {
379                (Some(start), Some(stop)) if (*start + 1) >= *stop => true,
380                (_, Some(stop)) if *stop == 0 => true,
381                _ => false,
382            }
383        }) {
384            yield SourceMessageEvent::Data(Vec::new());
385            return Ok(());
386        };
387
388        let mut stop_offsets: HashMap<_, _> = self
389            .offsets
390            .iter()
391            .flat_map(|(split_id, (_, stop_offset))| {
392                stop_offset
393                    .filter(|offset| *offset > 0)
394                    .map(|offset| (split_id.clone() as SplitId, offset))
395            })
396            .collect();
397        let is_bounded = !stop_offsets.is_empty();
398
399        let mut interval = tokio::time::interval(Duration::from_secs(1));
400        interval.tick().await;
401        let mut bytes_current_second = 0;
402        let mut num_messages = 0;
403        let max_chunk_size = self.source_ctx.source_ctrl_opts.chunk_size;
404        let mut res = Vec::with_capacity(max_chunk_size);
405        // ingest kafka message header can be expensive, do it only when required
406        let require_message_header = self.parser_config.common.rw_columns.iter().any(|col_desc| {
407            matches!(
408                col_desc.additional_column.column_type,
409                Some(AdditionalColumnType::Headers(_) | AdditionalColumnType::HeaderInner(_))
410            )
411        });
412
413        let mut latest_message_id_metrics: HashMap<String, LabelGuardedIntGauge> = HashMap::new();
414        let mut partition_eof_count_metrics: HashMap<String, LabelGuardedIntCounter> =
415            HashMap::new();
416        let mut partition_eof_offset_metrics: HashMap<String, LabelGuardedIntGauge> =
417            HashMap::new();
418        let mut latest_progress_offsets = self
419            .splits
420            .iter()
421            .filter_map(|split| split.start_offset.map(|offset| (split.id(), offset)))
422            .collect::<HashMap<_, _>>();
423
424        #[for_await]
425        'for_outer_loop: for raw_msgs in self.consumer.stream().ready_chunks(max_chunk_size) {
426            let mut eof_partitions = HashSet::new();
427            let mut msgs = Vec::with_capacity(max_chunk_size);
428            for msg in raw_msgs {
429                match msg {
430                    Ok(msg) => msgs.push(msg),
431                    Err(KafkaError::PartitionEOF(partition)) => {
432                        let split_id = partition.to_string();
433                        partition_eof_count_metrics
434                            .entry(split_id.clone())
435                            .or_insert_with(|| {
436                                self.source_ctx
437                                    .metrics
438                                    .partition_eof_count
439                                    .with_guarded_label_values(&[
440                                        &self.source_ctx.source_id.to_string(),
441                                        &split_id,
442                                        &self.source_ctx.source_name,
443                                        &self.source_ctx.fragment_id.to_string(),
444                                    ])
445                            })
446                            .inc();
447                        eof_partitions.insert(partition);
448                    }
449                    Err(err) => return Err(err.into()),
450                }
451            }
452
453            let eof_offsets = if eof_partitions.is_empty() {
454                HashMap::new()
455            } else {
456                self.resolve_eof_offsets(&eof_partitions)?
457            };
458            for (split_id, eof_offset) in &eof_offsets {
459                let split_id_label = split_id.as_ref().to_owned();
460                partition_eof_offset_metrics
461                    .entry(split_id_label.clone())
462                    .or_insert_with(|| {
463                        self.source_ctx
464                            .metrics
465                            .partition_eof_offset
466                            .with_guarded_label_values(&[
467                                &self.source_ctx.source_id.to_string(),
468                                &split_id_label,
469                                &self.source_ctx.source_name,
470                                &self.source_ctx.fragment_id.to_string(),
471                            ])
472                    })
473                    .set(*eof_offset);
474                tracing::info!(
475                    actor_id = %self.source_ctx.actor_id,
476                    source_id = %self.source_ctx.source_id,
477                    source_name = self.source_ctx.source_name,
478                    fragment_id = %self.source_ctx.fragment_id,
479                    split_id = %split_id,
480                    eof_offset,
481                    "received kafka partition EOF"
482                );
483            }
484            let split_progress =
485                Self::snapshot_split_progress(&eof_offsets, &mut latest_progress_offsets);
486            let split_progress =
487                Self::apply_split_progress(&mut stop_offsets, split_progress, is_bounded);
488
489            if msgs.is_empty() {
490                if let Some(progress) = split_progress {
491                    yield SourceMessageEvent::SplitProgress(progress);
492                }
493                if is_bounded && stop_offsets.is_empty() {
494                    if !res.is_empty() {
495                        yield SourceMessageEvent::Data(res);
496                    }
497                    break 'for_outer_loop;
498                }
499                continue;
500            }
501
502            let mut split_msg_offsets = HashMap::new();
503
504            for msg in &msgs {
505                split_msg_offsets.insert(msg.partition(), msg.offset());
506            }
507
508            for (partition, offset) in split_msg_offsets {
509                let split_id = partition.to_string();
510                latest_message_id_metrics
511                    .entry(split_id.clone())
512                    .or_insert_with(|| {
513                        self.source_ctx
514                            .metrics
515                            .latest_message_id
516                            .with_guarded_label_values(&[
517                                // source name is not available here
518                                &self.source_ctx.source_id.to_string(),
519                                &self.source_ctx.actor_id.to_string(),
520                                &split_id,
521                            ])
522                    })
523                    .set(offset);
524            }
525
526            for msg in msgs {
527                let cur_offset = msg.offset();
528                bytes_current_second += match &msg.payload() {
529                    None => 0,
530                    Some(payload) => payload.len(),
531                };
532                num_messages += 1;
533                let source_message =
534                    SourceMessage::from_kafka_message(&msg, require_message_header);
535                let split_id = source_message.split_id.clone();
536                res.push(source_message);
537
538                if let Entry::Occupied(o) = stop_offsets.entry(split_id) {
539                    let stop_offset = *o.get();
540
541                    if cur_offset == stop_offset - 1 {
542                        tracing::debug!(
543                            "stop offset reached for split {}, stop reading, offset: {}, stop offset: {}",
544                            o.key(),
545                            cur_offset,
546                            stop_offset
547                        );
548
549                        o.remove();
550
551                        if stop_offsets.is_empty() {
552                            yield SourceMessageEvent::Data(res);
553                            break 'for_outer_loop;
554                        }
555
556                        continue;
557                    }
558                }
559
560                if bytes_current_second > self.bytes_per_second {
561                    // swap to make compiler happy
562                    let mut cur = Vec::with_capacity(res.capacity());
563                    swap(&mut cur, &mut res);
564                    yield SourceMessageEvent::Data(cur);
565                    interval.tick().await;
566                    bytes_current_second = 0;
567                    res.clear();
568                }
569                if num_messages >= self.max_num_messages {
570                    yield SourceMessageEvent::Data(res);
571                    break 'for_outer_loop;
572                }
573            }
574            let mut cur = Vec::with_capacity(res.capacity());
575            swap(&mut cur, &mut res);
576            yield SourceMessageEvent::Data(cur);
577            if let Some(progress) = split_progress {
578                yield SourceMessageEvent::SplitProgress(progress);
579            }
580            if is_bounded && stop_offsets.is_empty() {
581                break 'for_outer_loop;
582            }
583            // don't clear `bytes_current_second` here as it is only related to `.tick()`.
584            // yield in the outer loop so that we can always guarantee that some messages are read
585            // every `MAX_CHUNK_SIZE`.
586        }
587        tracing::info!("kafka reader finished");
588    }
589}
590
591#[cfg(test)]
592mod tests {
593    use std::collections::HashMap;
594
595    use super::KafkaSplitReader;
596    use crate::source::SplitId;
597
598    #[test]
599    fn test_drain_stop_offsets_with_progress() {
600        let mut stop_offsets = HashMap::from([
601            (SplitId::from("0"), 10_i64),
602            (SplitId::from("1"), 15_i64),
603            (SplitId::from("2"), 20_i64),
604        ]);
605        let progress = HashMap::from([
606            (SplitId::from("0"), "9".to_owned()),
607            (SplitId::from("1"), "13".to_owned()),
608            (SplitId::from("2"), "invalid".to_owned()),
609        ]);
610
611        KafkaSplitReader::drain_stop_offsets_with_progress(&mut stop_offsets, &progress);
612
613        assert!(!stop_offsets.contains_key("0"));
614        assert_eq!(stop_offsets.get("1"), Some(&15));
615        assert_eq!(stop_offsets.get("2"), Some(&20));
616    }
617
618    #[test]
619    fn test_drain_stop_offsets_with_progress_for_zero_stop_offset() {
620        let mut stop_offsets = HashMap::from([(SplitId::from("0"), 0_i64)]);
621        let progress = HashMap::from([(SplitId::from("0"), "0".to_owned())]);
622
623        KafkaSplitReader::drain_stop_offsets_with_progress(&mut stop_offsets, &progress);
624
625        assert!(stop_offsets.is_empty());
626    }
627
628    #[test]
629    fn test_max_known_offset_prefers_larger_offset() {
630        assert_eq!(
631            KafkaSplitReader::max_known_offset(Some(3), Some(4)),
632            Some(4)
633        );
634        assert_eq!(
635            KafkaSplitReader::max_known_offset(Some(7), Some(4)),
636            Some(7)
637        );
638        assert_eq!(KafkaSplitReader::max_known_offset(None, Some(4)), Some(4));
639        assert_eq!(KafkaSplitReader::max_known_offset(Some(4), None), Some(4));
640    }
641
642    #[test]
643    fn test_apply_split_progress_for_empty_message_batch() {
644        let mut stop_offsets = HashMap::from([(SplitId::from("0"), 5_i64)]);
645        let split_progress = Some(HashMap::from([(SplitId::from("0"), "4".to_owned())]));
646
647        let applied =
648            KafkaSplitReader::apply_split_progress(&mut stop_offsets, split_progress, true);
649
650        assert_eq!(
651            applied,
652            Some(HashMap::from([(SplitId::from("0"), "4".to_owned())]))
653        );
654        assert!(stop_offsets.is_empty());
655    }
656}