risingwave_connector/source/kafka/source/
reader.rs1use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::mem::swap;
18use std::time::Duration;
19
20use anyhow::Context;
21use async_trait::async_trait;
22use futures::StreamExt;
23use futures_async_stream::try_stream;
24use rdkafka::consumer::{Consumer, StreamConsumer};
25use rdkafka::error::KafkaError;
26use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList};
27use risingwave_common::metrics::LabelGuardedIntGauge;
28use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
29
30use crate::connector_common::read_kafka_log_level;
31use crate::error::ConnectorResult as Result;
32use crate::parser::ParserConfig;
33use crate::source::base::SourceMessage;
34use crate::source::kafka::{
35 KAFKA_ISOLATION_LEVEL, KafkaContextCommon, KafkaProperties, KafkaSplit, RwConsumerContext,
36};
37use crate::source::{
38 BackfillInfo, BoxSourceChunkStream, Column, SourceContextRef, SplitId, SplitImpl,
39 SplitMetaData, SplitReader, into_chunk_stream,
40};
41
42pub struct KafkaSplitReader {
43 consumer: StreamConsumer<RwConsumerContext>,
44 offsets: HashMap<SplitId, (Option<i64>, Option<i64>)>,
45 backfill_info: HashMap<SplitId, BackfillInfo>,
46 splits: Vec<KafkaSplit>,
47 sync_call_timeout: Duration,
48 bytes_per_second: usize,
49 max_num_messages: usize,
50 parser_config: ParserConfig,
51 source_ctx: SourceContextRef,
52}
53
54#[async_trait]
55impl SplitReader for KafkaSplitReader {
56 type Properties = KafkaProperties;
57 type Split = KafkaSplit;
58
59 async fn new(
60 properties: KafkaProperties,
61 splits: Vec<KafkaSplit>,
62 parser_config: ParserConfig,
63 source_ctx: SourceContextRef,
64 _columns: Option<Vec<Column>>,
65 ) -> Result<Self> {
66 let mut config = ClientConfig::new();
67
68 let bootstrap_servers = &properties.connection.brokers;
69 let broker_rewrite_map = properties.privatelink_common.broker_rewrite_map.clone();
70
71 config.set("enable.partition.eof", "false");
73 config.set("auto.offset.reset", "smallest");
74 config.set("isolation.level", KAFKA_ISOLATION_LEVEL);
75 config.set("bootstrap.servers", bootstrap_servers);
76
77 properties.connection.set_security_properties(&mut config);
78 properties.set_client(&mut config);
79
80 config.set("group.id", properties.group_id(source_ctx.fragment_id));
81
82 let ctx_common = KafkaContextCommon::new(
83 broker_rewrite_map,
84 Some(format!(
85 "fragment-{}-source-{}-actor-{}",
86 source_ctx.fragment_id, source_ctx.source_id, source_ctx.actor_id
87 )),
88 Some(source_ctx.metrics.rdkafka_native_metric.clone()),
91 properties.aws_auth_props,
92 properties.connection.is_aws_msk_iam(),
93 )
94 .await?;
95
96 let client_ctx = RwConsumerContext::new(ctx_common);
97
98 if let Some(log_level) = read_kafka_log_level() {
99 config.set_log_level(log_level);
100 }
101 let consumer: StreamConsumer<RwConsumerContext> = config
102 .create_with_context(client_ctx)
103 .await
104 .context("failed to create kafka consumer")?;
105
106 let mut tpl = TopicPartitionList::with_capacity(splits.len());
107
108 let mut offsets = HashMap::new();
109 let mut backfill_info = HashMap::new();
110 for split in splits.clone() {
111 offsets.insert(split.id(), (split.start_offset, split.stop_offset));
112
113 if let Some(offset) = split.start_offset {
114 tpl.add_partition_offset(
115 split.topic.as_str(),
116 split.partition,
117 Offset::Offset(offset + 1),
118 )?;
119 } else {
120 tpl.add_partition(split.topic.as_str(), split.partition);
121 }
122
123 let (low, high) = consumer
124 .fetch_watermarks(
125 split.topic.as_str(),
126 split.partition,
127 properties.common.sync_call_timeout,
128 )
129 .await?;
130 tracing::info!("fetch kafka watermarks: low: {low}, high: {high}, split: {split:?}");
131 if low == high || split.start_offset.is_some_and(|offset| offset + 1 >= high) {
133 backfill_info.insert(split.id(), BackfillInfo::NoDataToBackfill);
134 } else {
135 debug_assert!(high > 0);
136 backfill_info.insert(
137 split.id(),
138 BackfillInfo::HasDataToBackfill {
139 latest_offset: (high - 1).to_string(),
140 },
141 );
142 }
143 }
144 tracing::info!(
145 topic = properties.common.topic,
146 source_name = source_ctx.source_name,
147 fragment_id = source_ctx.fragment_id,
148 source_id = source_ctx.source_id.table_id,
149 actor_id = source_ctx.actor_id,
150 "backfill_info: {:?}",
151 backfill_info
152 );
153
154 consumer.assign(&tpl)?;
155
156 let bytes_per_second = match properties.bytes_per_second {
159 None => usize::MAX,
160 Some(number) => number
161 .parse::<usize>()
162 .expect("bytes.per.second expect usize"),
163 };
164 let max_num_messages = match properties.max_num_messages {
165 None => usize::MAX,
166 Some(number) => number
167 .parse::<usize>()
168 .expect("max.num.messages expect usize"),
169 };
170
171 Ok(Self {
172 consumer,
173 offsets,
174 splits,
175 backfill_info,
176 bytes_per_second,
177 sync_call_timeout: properties.common.sync_call_timeout,
178 max_num_messages,
179 parser_config,
180 source_ctx,
181 })
182 }
183
184 fn into_stream(self) -> BoxSourceChunkStream {
185 let parser_config = self.parser_config.clone();
186 let source_context = self.source_ctx.clone();
187 into_chunk_stream(self.into_data_stream(), parser_config, source_context)
188 }
189
190 fn backfill_info(&self) -> HashMap<SplitId, BackfillInfo> {
191 self.backfill_info.clone()
192 }
193
194 async fn seek_to_latest(&mut self) -> Result<Vec<SplitImpl>> {
195 let mut latest_splits: Vec<SplitImpl> = Vec::new();
196 let mut tpl = TopicPartitionList::with_capacity(self.splits.len());
197 for mut split in self.splits.clone() {
198 let (_low, high) = self
200 .consumer
201 .fetch_watermarks(
202 split.topic.as_str(),
203 split.partition,
204 self.sync_call_timeout,
205 )
206 .await?;
207 tpl.add_partition_offset(split.topic.as_str(), split.partition, Offset::Offset(high))?;
208 split.start_offset = Some(high - 1);
209 latest_splits.push(split.into());
210 }
211 self.consumer.assign(&tpl)?;
213 Ok(latest_splits)
214 }
215}
216
217impl KafkaSplitReader {
218 #[try_stream(ok = Vec<SourceMessage>, error = crate::error::ConnectorError)]
219 async fn into_data_stream(self) {
220 if self.offsets.values().all(|(start_offset, stop_offset)| {
221 match (start_offset, stop_offset) {
222 (Some(start), Some(stop)) if (*start + 1) >= *stop => true,
223 (_, Some(stop)) if *stop == 0 => true,
224 _ => false,
225 }
226 }) {
227 yield Vec::new();
228 return Ok(());
229 };
230
231 let mut stop_offsets: HashMap<_, _> = self
232 .offsets
233 .iter()
234 .flat_map(|(split_id, (_, stop_offset))| {
235 stop_offset.map(|offset| (split_id.clone() as SplitId, offset))
236 })
237 .collect();
238
239 let mut interval = tokio::time::interval(Duration::from_secs(1));
240 interval.tick().await;
241 let mut bytes_current_second = 0;
242 let mut num_messages = 0;
243 let max_chunk_size = self.source_ctx.source_ctrl_opts.chunk_size;
244 let mut res = Vec::with_capacity(max_chunk_size);
245 let require_message_header = self.parser_config.common.rw_columns.iter().any(|col_desc| {
247 matches!(
248 col_desc.additional_column.column_type,
249 Some(AdditionalColumnType::Headers(_) | AdditionalColumnType::HeaderInner(_))
250 )
251 });
252
253 let mut latest_message_id_metrics: HashMap<String, LabelGuardedIntGauge> = HashMap::new();
254
255 #[for_await]
256 'for_outer_loop: for msgs in self.consumer.stream().ready_chunks(max_chunk_size) {
257 let msgs: Vec<_> = msgs
258 .into_iter()
259 .collect::<std::result::Result<_, KafkaError>>()?;
260
261 let mut split_msg_offsets = HashMap::new();
262
263 for msg in &msgs {
264 split_msg_offsets.insert(msg.partition(), msg.offset());
265 }
266
267 for (partition, offset) in split_msg_offsets {
268 let split_id = partition.to_string();
269 latest_message_id_metrics
270 .entry(split_id.clone())
271 .or_insert_with(|| {
272 self.source_ctx
273 .metrics
274 .latest_message_id
275 .with_guarded_label_values(&[
276 &self.source_ctx.source_id.to_string(),
278 &self.source_ctx.actor_id.to_string(),
279 &split_id,
280 ])
281 })
282 .set(offset);
283 }
284
285 for msg in msgs {
286 let cur_offset = msg.offset();
287 bytes_current_second += match &msg.payload() {
288 None => 0,
289 Some(payload) => payload.len(),
290 };
291 num_messages += 1;
292 let source_message =
293 SourceMessage::from_kafka_message(&msg, require_message_header);
294 let split_id = source_message.split_id.clone();
295 res.push(source_message);
296
297 if let Entry::Occupied(o) = stop_offsets.entry(split_id) {
298 let stop_offset = *o.get();
299
300 if cur_offset == stop_offset - 1 {
301 tracing::debug!(
302 "stop offset reached for split {}, stop reading, offset: {}, stop offset: {}",
303 o.key(),
304 cur_offset,
305 stop_offset
306 );
307
308 o.remove();
309
310 if stop_offsets.is_empty() {
311 yield res;
312 break 'for_outer_loop;
313 }
314
315 continue;
316 }
317 }
318
319 if bytes_current_second > self.bytes_per_second {
320 let mut cur = Vec::with_capacity(res.capacity());
322 swap(&mut cur, &mut res);
323 yield cur;
324 interval.tick().await;
325 bytes_current_second = 0;
326 res.clear();
327 }
328 if num_messages >= self.max_num_messages {
329 yield res;
330 break 'for_outer_loop;
331 }
332 }
333 let mut cur = Vec::with_capacity(res.capacity());
334 swap(&mut cur, &mut res);
335 yield cur;
336 }
340 tracing::info!("kafka reader finished");
341 }
342}