risingwave_connector/source/kafka/source/
reader.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::mem::swap;
18use std::time::Duration;
19
20use anyhow::Context;
21use async_trait::async_trait;
22use futures::StreamExt;
23use futures_async_stream::try_stream;
24use rdkafka::config::RDKafkaLogLevel;
25use rdkafka::consumer::{Consumer, StreamConsumer};
26use rdkafka::error::KafkaError;
27use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList};
28use risingwave_common::metrics::LabelGuardedIntGauge;
29use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
30
31use crate::error::ConnectorResult as Result;
32use crate::parser::ParserConfig;
33use crate::source::base::SourceMessage;
34use crate::source::kafka::{
35    KAFKA_ISOLATION_LEVEL, KafkaContextCommon, KafkaProperties, KafkaSplit, RwConsumerContext,
36};
37use crate::source::{
38    BackfillInfo, BoxSourceChunkStream, Column, SourceContextRef, SplitId, SplitImpl,
39    SplitMetaData, SplitReader, into_chunk_stream,
40};
41
42pub struct KafkaSplitReader {
43    consumer: StreamConsumer<RwConsumerContext>,
44    offsets: HashMap<SplitId, (Option<i64>, Option<i64>)>,
45    backfill_info: HashMap<SplitId, BackfillInfo>,
46    splits: Vec<KafkaSplit>,
47    sync_call_timeout: Duration,
48    bytes_per_second: usize,
49    max_num_messages: usize,
50    parser_config: ParserConfig,
51    source_ctx: SourceContextRef,
52}
53
54#[async_trait]
55impl SplitReader for KafkaSplitReader {
56    type Properties = KafkaProperties;
57    type Split = KafkaSplit;
58
59    async fn new(
60        properties: KafkaProperties,
61        splits: Vec<KafkaSplit>,
62        parser_config: ParserConfig,
63        source_ctx: SourceContextRef,
64        _columns: Option<Vec<Column>>,
65    ) -> Result<Self> {
66        let mut config = ClientConfig::new();
67
68        let bootstrap_servers = &properties.connection.brokers;
69        let broker_rewrite_map = properties.privatelink_common.broker_rewrite_map.clone();
70
71        // disable partition eof
72        config.set("enable.partition.eof", "false");
73        config.set("auto.offset.reset", "smallest");
74        config.set("isolation.level", KAFKA_ISOLATION_LEVEL);
75        config.set("bootstrap.servers", bootstrap_servers);
76
77        properties.connection.set_security_properties(&mut config);
78        properties.set_client(&mut config);
79
80        config.set("group.id", properties.group_id(source_ctx.fragment_id));
81
82        let ctx_common = KafkaContextCommon::new(
83            broker_rewrite_map,
84            Some(format!(
85                "fragment-{}-source-{}-actor-{}",
86                source_ctx.fragment_id, source_ctx.source_id, source_ctx.actor_id
87            )),
88            // thread consumer will keep polling in the background, we don't need to call `poll`
89            // explicitly
90            Some(source_ctx.metrics.rdkafka_native_metric.clone()),
91            properties.aws_auth_props,
92            properties.connection.is_aws_msk_iam(),
93        )
94        .await?;
95
96        let client_ctx = RwConsumerContext::new(ctx_common);
97        let consumer: StreamConsumer<RwConsumerContext> = config
98            .set_log_level(RDKafkaLogLevel::Info)
99            .create_with_context(client_ctx)
100            .await
101            .context("failed to create kafka consumer")?;
102
103        let mut tpl = TopicPartitionList::with_capacity(splits.len());
104
105        let mut offsets = HashMap::new();
106        let mut backfill_info = HashMap::new();
107        for split in splits.clone() {
108            offsets.insert(split.id(), (split.start_offset, split.stop_offset));
109
110            if let Some(offset) = split.start_offset {
111                tpl.add_partition_offset(
112                    split.topic.as_str(),
113                    split.partition,
114                    Offset::Offset(offset + 1),
115                )?;
116            } else {
117                tpl.add_partition(split.topic.as_str(), split.partition);
118            }
119
120            let (low, high) = consumer
121                .fetch_watermarks(
122                    split.topic.as_str(),
123                    split.partition,
124                    properties.common.sync_call_timeout,
125                )
126                .await?;
127            tracing::info!("fetch kafka watermarks: low: {low}, high: {high}, split: {split:?}");
128            // note: low is inclusive, high is exclusive, start_offset is exclusive
129            if low == high || split.start_offset.is_some_and(|offset| offset + 1 >= high) {
130                backfill_info.insert(split.id(), BackfillInfo::NoDataToBackfill);
131            } else {
132                debug_assert!(high > 0);
133                backfill_info.insert(
134                    split.id(),
135                    BackfillInfo::HasDataToBackfill {
136                        latest_offset: (high - 1).to_string(),
137                    },
138                );
139            }
140        }
141        tracing::info!(
142            topic = properties.common.topic,
143            source_name = source_ctx.source_name,
144            fragment_id = source_ctx.fragment_id,
145            source_id = source_ctx.source_id.table_id,
146            actor_id = source_ctx.actor_id,
147            "backfill_info: {:?}",
148            backfill_info
149        );
150
151        consumer.assign(&tpl)?;
152
153        // The two parameters below are only used by developers for performance testing purposes,
154        // so we panic here on purpose if the input is not correctly recognized.
155        let bytes_per_second = match properties.bytes_per_second {
156            None => usize::MAX,
157            Some(number) => number
158                .parse::<usize>()
159                .expect("bytes.per.second expect usize"),
160        };
161        let max_num_messages = match properties.max_num_messages {
162            None => usize::MAX,
163            Some(number) => number
164                .parse::<usize>()
165                .expect("max.num.messages expect usize"),
166        };
167
168        Ok(Self {
169            consumer,
170            offsets,
171            splits,
172            backfill_info,
173            bytes_per_second,
174            sync_call_timeout: properties.common.sync_call_timeout,
175            max_num_messages,
176            parser_config,
177            source_ctx,
178        })
179    }
180
181    fn into_stream(self) -> BoxSourceChunkStream {
182        let parser_config = self.parser_config.clone();
183        let source_context = self.source_ctx.clone();
184        into_chunk_stream(self.into_data_stream(), parser_config, source_context)
185    }
186
187    fn backfill_info(&self) -> HashMap<SplitId, BackfillInfo> {
188        self.backfill_info.clone()
189    }
190
191    async fn seek_to_latest(&mut self) -> Result<Vec<SplitImpl>> {
192        let mut latest_splits: Vec<SplitImpl> = Vec::new();
193        let mut tpl = TopicPartitionList::with_capacity(self.splits.len());
194        for mut split in self.splits.clone() {
195            // we can't get latest offset if we use Offset::End, so we just fetch watermark here.
196            let (_low, high) = self
197                .consumer
198                .fetch_watermarks(
199                    split.topic.as_str(),
200                    split.partition,
201                    self.sync_call_timeout,
202                )
203                .await?;
204            tpl.add_partition_offset(split.topic.as_str(), split.partition, Offset::Offset(high))?;
205            split.start_offset = Some(high - 1);
206            latest_splits.push(split.into());
207        }
208        // replace the previous assignment
209        self.consumer.assign(&tpl)?;
210        Ok(latest_splits)
211    }
212}
213
214impl KafkaSplitReader {
215    #[try_stream(ok = Vec<SourceMessage>, error = crate::error::ConnectorError)]
216    async fn into_data_stream(self) {
217        if self.offsets.values().all(|(start_offset, stop_offset)| {
218            match (start_offset, stop_offset) {
219                (Some(start), Some(stop)) if (*start + 1) >= *stop => true,
220                (_, Some(stop)) if *stop == 0 => true,
221                _ => false,
222            }
223        }) {
224            yield Vec::new();
225            return Ok(());
226        };
227
228        let mut stop_offsets: HashMap<_, _> = self
229            .offsets
230            .iter()
231            .flat_map(|(split_id, (_, stop_offset))| {
232                stop_offset.map(|offset| (split_id.clone() as SplitId, offset))
233            })
234            .collect();
235
236        let mut interval = tokio::time::interval(Duration::from_secs(1));
237        interval.tick().await;
238        let mut bytes_current_second = 0;
239        let mut num_messages = 0;
240        let max_chunk_size = self.source_ctx.source_ctrl_opts.chunk_size;
241        let mut res = Vec::with_capacity(max_chunk_size);
242        // ingest kafka message header can be expensive, do it only when required
243        let require_message_header = self.parser_config.common.rw_columns.iter().any(|col_desc| {
244            matches!(
245                col_desc.additional_column.column_type,
246                Some(AdditionalColumnType::Headers(_) | AdditionalColumnType::HeaderInner(_))
247            )
248        });
249
250        let mut latest_message_id_metrics: HashMap<String, LabelGuardedIntGauge<3>> =
251            HashMap::new();
252
253        #[for_await]
254        'for_outer_loop: for msgs in self.consumer.stream().ready_chunks(max_chunk_size) {
255            let msgs: Vec<_> = msgs
256                .into_iter()
257                .collect::<std::result::Result<_, KafkaError>>()?;
258
259            let mut split_msg_offsets = HashMap::new();
260
261            for msg in &msgs {
262                split_msg_offsets.insert(msg.partition(), msg.offset());
263            }
264
265            for (partition, offset) in split_msg_offsets {
266                let split_id = partition.to_string();
267                latest_message_id_metrics
268                    .entry(split_id.clone())
269                    .or_insert_with(|| {
270                        self.source_ctx
271                            .metrics
272                            .latest_message_id
273                            .with_guarded_label_values(&[
274                                // source name is not available here
275                                &self.source_ctx.source_id.to_string(),
276                                &self.source_ctx.actor_id.to_string(),
277                                &split_id,
278                            ])
279                    })
280                    .set(offset);
281            }
282
283            for msg in msgs {
284                let cur_offset = msg.offset();
285                bytes_current_second += match &msg.payload() {
286                    None => 0,
287                    Some(payload) => payload.len(),
288                };
289                num_messages += 1;
290                let source_message =
291                    SourceMessage::from_kafka_message(&msg, require_message_header);
292                let split_id = source_message.split_id.clone();
293                res.push(source_message);
294
295                if let Entry::Occupied(o) = stop_offsets.entry(split_id) {
296                    let stop_offset = *o.get();
297
298                    if cur_offset == stop_offset - 1 {
299                        tracing::debug!(
300                            "stop offset reached for split {}, stop reading, offset: {}, stop offset: {}",
301                            o.key(),
302                            cur_offset,
303                            stop_offset
304                        );
305
306                        o.remove();
307
308                        if stop_offsets.is_empty() {
309                            yield res;
310                            break 'for_outer_loop;
311                        }
312
313                        continue;
314                    }
315                }
316
317                if bytes_current_second > self.bytes_per_second {
318                    // swap to make compiler happy
319                    let mut cur = Vec::with_capacity(res.capacity());
320                    swap(&mut cur, &mut res);
321                    yield cur;
322                    interval.tick().await;
323                    bytes_current_second = 0;
324                    res.clear();
325                }
326                if num_messages >= self.max_num_messages {
327                    yield res;
328                    break 'for_outer_loop;
329                }
330            }
331            let mut cur = Vec::with_capacity(res.capacity());
332            swap(&mut cur, &mut res);
333            yield cur;
334            // don't clear `bytes_current_second` here as it is only related to `.tick()`.
335            // yield in the outer loop so that we can always guarantee that some messages are read
336            // every `MAX_CHUNK_SIZE`.
337        }
338        tracing::info!("kafka reader finished");
339    }
340}