risingwave_connector/source/kafka/source/
reader.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::mem::swap;
18use std::time::Duration;
19
20use anyhow::Context;
21use async_trait::async_trait;
22use futures::StreamExt;
23use futures_async_stream::try_stream;
24use rdkafka::consumer::{Consumer, StreamConsumer};
25use rdkafka::error::KafkaError;
26use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList};
27use risingwave_common::metrics::LabelGuardedIntGauge;
28use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
29
30use crate::connector_common::read_kafka_log_level;
31use crate::error::ConnectorResult as Result;
32use crate::parser::ParserConfig;
33use crate::source::base::SourceMessage;
34use crate::source::kafka::{
35    KAFKA_ISOLATION_LEVEL, KafkaContextCommon, KafkaProperties, KafkaSplit, RwConsumerContext,
36};
37use crate::source::{
38    BackfillInfo, BoxSourceChunkStream, Column, SourceContextRef, SplitId, SplitImpl,
39    SplitMetaData, SplitReader, into_chunk_stream,
40};
41
42pub struct KafkaSplitReader {
43    consumer: StreamConsumer<RwConsumerContext>,
44    offsets: HashMap<SplitId, (Option<i64>, Option<i64>)>,
45    backfill_info: HashMap<SplitId, BackfillInfo>,
46    splits: Vec<KafkaSplit>,
47    sync_call_timeout: Duration,
48    bytes_per_second: usize,
49    max_num_messages: usize,
50    parser_config: ParserConfig,
51    source_ctx: SourceContextRef,
52}
53
54#[async_trait]
55impl SplitReader for KafkaSplitReader {
56    type Properties = KafkaProperties;
57    type Split = KafkaSplit;
58
59    async fn new(
60        properties: KafkaProperties,
61        splits: Vec<KafkaSplit>,
62        parser_config: ParserConfig,
63        source_ctx: SourceContextRef,
64        _columns: Option<Vec<Column>>,
65    ) -> Result<Self> {
66        let mut config = ClientConfig::new();
67
68        let bootstrap_servers = &properties.connection.brokers;
69        let broker_rewrite_map = properties.privatelink_common.broker_rewrite_map.clone();
70
71        // disable partition eof
72        config.set("enable.partition.eof", "false");
73        config.set("auto.offset.reset", "smallest");
74        config.set("isolation.level", KAFKA_ISOLATION_LEVEL);
75        config.set("bootstrap.servers", bootstrap_servers);
76
77        properties.connection.set_security_properties(&mut config);
78        properties.set_client(&mut config);
79
80        config.set("group.id", properties.group_id(source_ctx.fragment_id));
81
82        let ctx_common = KafkaContextCommon::new(
83            broker_rewrite_map,
84            Some(format!(
85                "fragment-{}-source-{}-actor-{}",
86                source_ctx.fragment_id, source_ctx.source_id, source_ctx.actor_id
87            )),
88            // thread consumer will keep polling in the background, we don't need to call `poll`
89            // explicitly
90            Some(source_ctx.metrics.rdkafka_native_metric.clone()),
91            properties.aws_auth_props,
92            properties.connection.is_aws_msk_iam(),
93        )
94        .await?;
95
96        let client_ctx = RwConsumerContext::new(ctx_common);
97
98        if let Some(log_level) = read_kafka_log_level() {
99            config.set_log_level(log_level);
100        }
101        let consumer: StreamConsumer<RwConsumerContext> = config
102            .create_with_context(client_ctx)
103            .await
104            .context("failed to create kafka consumer")?;
105
106        let mut tpl = TopicPartitionList::with_capacity(splits.len());
107
108        let mut offsets = HashMap::new();
109        let mut backfill_info = HashMap::new();
110        for split in splits.clone() {
111            offsets.insert(split.id(), (split.start_offset, split.stop_offset));
112
113            if let Some(offset) = split.start_offset {
114                tpl.add_partition_offset(
115                    split.topic.as_str(),
116                    split.partition,
117                    Offset::Offset(offset + 1),
118                )?;
119            } else {
120                tpl.add_partition(split.topic.as_str(), split.partition);
121            }
122
123            let (low, high) = consumer
124                .fetch_watermarks(
125                    split.topic.as_str(),
126                    split.partition,
127                    properties.common.sync_call_timeout,
128                )
129                .await?;
130            tracing::info!("fetch kafka watermarks: low: {low}, high: {high}, split: {split:?}");
131            // note: low is inclusive, high is exclusive, start_offset is exclusive
132            if low == high || split.start_offset.is_some_and(|offset| offset + 1 >= high) {
133                backfill_info.insert(split.id(), BackfillInfo::NoDataToBackfill);
134            } else {
135                debug_assert!(high > 0);
136                backfill_info.insert(
137                    split.id(),
138                    BackfillInfo::HasDataToBackfill {
139                        latest_offset: (high - 1).to_string(),
140                    },
141                );
142            }
143        }
144        tracing::info!(
145            topic = properties.common.topic,
146            source_name = source_ctx.source_name,
147            fragment_id = source_ctx.fragment_id,
148            source_id = source_ctx.source_id.table_id,
149            actor_id = source_ctx.actor_id,
150            "backfill_info: {:?}",
151            backfill_info
152        );
153
154        consumer.assign(&tpl)?;
155
156        // The two parameters below are only used by developers for performance testing purposes,
157        // so we panic here on purpose if the input is not correctly recognized.
158        let bytes_per_second = match properties.bytes_per_second {
159            None => usize::MAX,
160            Some(number) => number
161                .parse::<usize>()
162                .expect("bytes.per.second expect usize"),
163        };
164        let max_num_messages = match properties.max_num_messages {
165            None => usize::MAX,
166            Some(number) => number
167                .parse::<usize>()
168                .expect("max.num.messages expect usize"),
169        };
170
171        Ok(Self {
172            consumer,
173            offsets,
174            splits,
175            backfill_info,
176            bytes_per_second,
177            sync_call_timeout: properties.common.sync_call_timeout,
178            max_num_messages,
179            parser_config,
180            source_ctx,
181        })
182    }
183
184    fn into_stream(self) -> BoxSourceChunkStream {
185        let parser_config = self.parser_config.clone();
186        let source_context = self.source_ctx.clone();
187        into_chunk_stream(self.into_data_stream(), parser_config, source_context)
188    }
189
190    fn backfill_info(&self) -> HashMap<SplitId, BackfillInfo> {
191        self.backfill_info.clone()
192    }
193
194    async fn seek_to_latest(&mut self) -> Result<Vec<SplitImpl>> {
195        let mut latest_splits: Vec<SplitImpl> = Vec::new();
196        let mut tpl = TopicPartitionList::with_capacity(self.splits.len());
197        for mut split in self.splits.clone() {
198            // we can't get latest offset if we use Offset::End, so we just fetch watermark here.
199            let (_low, high) = self
200                .consumer
201                .fetch_watermarks(
202                    split.topic.as_str(),
203                    split.partition,
204                    self.sync_call_timeout,
205                )
206                .await?;
207            tpl.add_partition_offset(split.topic.as_str(), split.partition, Offset::Offset(high))?;
208            split.start_offset = Some(high - 1);
209            latest_splits.push(split.into());
210        }
211        // replace the previous assignment
212        self.consumer.assign(&tpl)?;
213        Ok(latest_splits)
214    }
215}
216
217impl KafkaSplitReader {
218    #[try_stream(ok = Vec<SourceMessage>, error = crate::error::ConnectorError)]
219    async fn into_data_stream(self) {
220        if self.offsets.values().all(|(start_offset, stop_offset)| {
221            match (start_offset, stop_offset) {
222                (Some(start), Some(stop)) if (*start + 1) >= *stop => true,
223                (_, Some(stop)) if *stop == 0 => true,
224                _ => false,
225            }
226        }) {
227            yield Vec::new();
228            return Ok(());
229        };
230
231        let mut stop_offsets: HashMap<_, _> = self
232            .offsets
233            .iter()
234            .flat_map(|(split_id, (_, stop_offset))| {
235                stop_offset.map(|offset| (split_id.clone() as SplitId, offset))
236            })
237            .collect();
238
239        let mut interval = tokio::time::interval(Duration::from_secs(1));
240        interval.tick().await;
241        let mut bytes_current_second = 0;
242        let mut num_messages = 0;
243        let max_chunk_size = self.source_ctx.source_ctrl_opts.chunk_size;
244        let mut res = Vec::with_capacity(max_chunk_size);
245        // ingest kafka message header can be expensive, do it only when required
246        let require_message_header = self.parser_config.common.rw_columns.iter().any(|col_desc| {
247            matches!(
248                col_desc.additional_column.column_type,
249                Some(AdditionalColumnType::Headers(_) | AdditionalColumnType::HeaderInner(_))
250            )
251        });
252
253        let mut latest_message_id_metrics: HashMap<String, LabelGuardedIntGauge> = HashMap::new();
254
255        #[for_await]
256        'for_outer_loop: for msgs in self.consumer.stream().ready_chunks(max_chunk_size) {
257            let msgs: Vec<_> = msgs
258                .into_iter()
259                .collect::<std::result::Result<_, KafkaError>>()?;
260
261            let mut split_msg_offsets = HashMap::new();
262
263            for msg in &msgs {
264                split_msg_offsets.insert(msg.partition(), msg.offset());
265            }
266
267            for (partition, offset) in split_msg_offsets {
268                let split_id = partition.to_string();
269                latest_message_id_metrics
270                    .entry(split_id.clone())
271                    .or_insert_with(|| {
272                        self.source_ctx
273                            .metrics
274                            .latest_message_id
275                            .with_guarded_label_values(&[
276                                // source name is not available here
277                                &self.source_ctx.source_id.to_string(),
278                                &self.source_ctx.actor_id.to_string(),
279                                &split_id,
280                            ])
281                    })
282                    .set(offset);
283            }
284
285            for msg in msgs {
286                let cur_offset = msg.offset();
287                bytes_current_second += match &msg.payload() {
288                    None => 0,
289                    Some(payload) => payload.len(),
290                };
291                num_messages += 1;
292                let source_message =
293                    SourceMessage::from_kafka_message(&msg, require_message_header);
294                let split_id = source_message.split_id.clone();
295                res.push(source_message);
296
297                if let Entry::Occupied(o) = stop_offsets.entry(split_id) {
298                    let stop_offset = *o.get();
299
300                    if cur_offset == stop_offset - 1 {
301                        tracing::debug!(
302                            "stop offset reached for split {}, stop reading, offset: {}, stop offset: {}",
303                            o.key(),
304                            cur_offset,
305                            stop_offset
306                        );
307
308                        o.remove();
309
310                        if stop_offsets.is_empty() {
311                            yield res;
312                            break 'for_outer_loop;
313                        }
314
315                        continue;
316                    }
317                }
318
319                if bytes_current_second > self.bytes_per_second {
320                    // swap to make compiler happy
321                    let mut cur = Vec::with_capacity(res.capacity());
322                    swap(&mut cur, &mut res);
323                    yield cur;
324                    interval.tick().await;
325                    bytes_current_second = 0;
326                    res.clear();
327                }
328                if num_messages >= self.max_num_messages {
329                    yield res;
330                    break 'for_outer_loop;
331                }
332            }
333            let mut cur = Vec::with_capacity(res.capacity());
334            swap(&mut cur, &mut res);
335            yield cur;
336            // don't clear `bytes_current_second` here as it is only related to `.tick()`.
337            // yield in the outer loop so that we can always guarantee that some messages are read
338            // every `MAX_CHUNK_SIZE`.
339        }
340        tracing::info!("kafka reader finished");
341    }
342}