risingwave_connector/source/kafka/source/
reader.rs1use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::mem::swap;
18use std::time::Duration;
19
20use anyhow::Context;
21use async_trait::async_trait;
22use futures::StreamExt;
23use futures_async_stream::try_stream;
24use rdkafka::config::RDKafkaLogLevel;
25use rdkafka::consumer::{Consumer, StreamConsumer};
26use rdkafka::error::KafkaError;
27use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList};
28use risingwave_common::metrics::LabelGuardedIntGauge;
29use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
30
31use crate::error::ConnectorResult as Result;
32use crate::parser::ParserConfig;
33use crate::source::base::SourceMessage;
34use crate::source::kafka::{
35 KAFKA_ISOLATION_LEVEL, KafkaContextCommon, KafkaProperties, KafkaSplit, RwConsumerContext,
36};
37use crate::source::{
38 BackfillInfo, BoxSourceChunkStream, Column, SourceContextRef, SplitId, SplitImpl,
39 SplitMetaData, SplitReader, into_chunk_stream,
40};
41
42pub struct KafkaSplitReader {
43 consumer: StreamConsumer<RwConsumerContext>,
44 offsets: HashMap<SplitId, (Option<i64>, Option<i64>)>,
45 backfill_info: HashMap<SplitId, BackfillInfo>,
46 splits: Vec<KafkaSplit>,
47 sync_call_timeout: Duration,
48 bytes_per_second: usize,
49 max_num_messages: usize,
50 parser_config: ParserConfig,
51 source_ctx: SourceContextRef,
52}
53
54#[async_trait]
55impl SplitReader for KafkaSplitReader {
56 type Properties = KafkaProperties;
57 type Split = KafkaSplit;
58
59 async fn new(
60 properties: KafkaProperties,
61 splits: Vec<KafkaSplit>,
62 parser_config: ParserConfig,
63 source_ctx: SourceContextRef,
64 _columns: Option<Vec<Column>>,
65 ) -> Result<Self> {
66 let mut config = ClientConfig::new();
67
68 let bootstrap_servers = &properties.connection.brokers;
69 let broker_rewrite_map = properties.privatelink_common.broker_rewrite_map.clone();
70
71 config.set("enable.partition.eof", "false");
73 config.set("auto.offset.reset", "smallest");
74 config.set("isolation.level", KAFKA_ISOLATION_LEVEL);
75 config.set("bootstrap.servers", bootstrap_servers);
76
77 properties.connection.set_security_properties(&mut config);
78 properties.set_client(&mut config);
79
80 config.set("group.id", properties.group_id(source_ctx.fragment_id));
81
82 let ctx_common = KafkaContextCommon::new(
83 broker_rewrite_map,
84 Some(format!(
85 "fragment-{}-source-{}-actor-{}",
86 source_ctx.fragment_id, source_ctx.source_id, source_ctx.actor_id
87 )),
88 Some(source_ctx.metrics.rdkafka_native_metric.clone()),
91 properties.aws_auth_props,
92 properties.connection.is_aws_msk_iam(),
93 )
94 .await?;
95
96 let client_ctx = RwConsumerContext::new(ctx_common);
97 let consumer: StreamConsumer<RwConsumerContext> = config
98 .set_log_level(RDKafkaLogLevel::Info)
99 .create_with_context(client_ctx)
100 .await
101 .context("failed to create kafka consumer")?;
102
103 let mut tpl = TopicPartitionList::with_capacity(splits.len());
104
105 let mut offsets = HashMap::new();
106 let mut backfill_info = HashMap::new();
107 for split in splits.clone() {
108 offsets.insert(split.id(), (split.start_offset, split.stop_offset));
109
110 if let Some(offset) = split.start_offset {
111 tpl.add_partition_offset(
112 split.topic.as_str(),
113 split.partition,
114 Offset::Offset(offset + 1),
115 )?;
116 } else {
117 tpl.add_partition(split.topic.as_str(), split.partition);
118 }
119
120 let (low, high) = consumer
121 .fetch_watermarks(
122 split.topic.as_str(),
123 split.partition,
124 properties.common.sync_call_timeout,
125 )
126 .await?;
127 tracing::info!("fetch kafka watermarks: low: {low}, high: {high}, split: {split:?}");
128 if low == high || split.start_offset.is_some_and(|offset| offset + 1 >= high) {
130 backfill_info.insert(split.id(), BackfillInfo::NoDataToBackfill);
131 } else {
132 debug_assert!(high > 0);
133 backfill_info.insert(
134 split.id(),
135 BackfillInfo::HasDataToBackfill {
136 latest_offset: (high - 1).to_string(),
137 },
138 );
139 }
140 }
141 tracing::info!(
142 topic = properties.common.topic,
143 source_name = source_ctx.source_name,
144 fragment_id = source_ctx.fragment_id,
145 source_id = source_ctx.source_id.table_id,
146 actor_id = source_ctx.actor_id,
147 "backfill_info: {:?}",
148 backfill_info
149 );
150
151 consumer.assign(&tpl)?;
152
153 let bytes_per_second = match properties.bytes_per_second {
156 None => usize::MAX,
157 Some(number) => number
158 .parse::<usize>()
159 .expect("bytes.per.second expect usize"),
160 };
161 let max_num_messages = match properties.max_num_messages {
162 None => usize::MAX,
163 Some(number) => number
164 .parse::<usize>()
165 .expect("max.num.messages expect usize"),
166 };
167
168 Ok(Self {
169 consumer,
170 offsets,
171 splits,
172 backfill_info,
173 bytes_per_second,
174 sync_call_timeout: properties.common.sync_call_timeout,
175 max_num_messages,
176 parser_config,
177 source_ctx,
178 })
179 }
180
181 fn into_stream(self) -> BoxSourceChunkStream {
182 let parser_config = self.parser_config.clone();
183 let source_context = self.source_ctx.clone();
184 into_chunk_stream(self.into_data_stream(), parser_config, source_context)
185 }
186
187 fn backfill_info(&self) -> HashMap<SplitId, BackfillInfo> {
188 self.backfill_info.clone()
189 }
190
191 async fn seek_to_latest(&mut self) -> Result<Vec<SplitImpl>> {
192 let mut latest_splits: Vec<SplitImpl> = Vec::new();
193 let mut tpl = TopicPartitionList::with_capacity(self.splits.len());
194 for mut split in self.splits.clone() {
195 let (_low, high) = self
197 .consumer
198 .fetch_watermarks(
199 split.topic.as_str(),
200 split.partition,
201 self.sync_call_timeout,
202 )
203 .await?;
204 tpl.add_partition_offset(split.topic.as_str(), split.partition, Offset::Offset(high))?;
205 split.start_offset = Some(high - 1);
206 latest_splits.push(split.into());
207 }
208 self.consumer.assign(&tpl)?;
210 Ok(latest_splits)
211 }
212}
213
214impl KafkaSplitReader {
215 #[try_stream(ok = Vec<SourceMessage>, error = crate::error::ConnectorError)]
216 async fn into_data_stream(self) {
217 if self.offsets.values().all(|(start_offset, stop_offset)| {
218 match (start_offset, stop_offset) {
219 (Some(start), Some(stop)) if (*start + 1) >= *stop => true,
220 (_, Some(stop)) if *stop == 0 => true,
221 _ => false,
222 }
223 }) {
224 yield Vec::new();
225 return Ok(());
226 };
227
228 let mut stop_offsets: HashMap<_, _> = self
229 .offsets
230 .iter()
231 .flat_map(|(split_id, (_, stop_offset))| {
232 stop_offset.map(|offset| (split_id.clone() as SplitId, offset))
233 })
234 .collect();
235
236 let mut interval = tokio::time::interval(Duration::from_secs(1));
237 interval.tick().await;
238 let mut bytes_current_second = 0;
239 let mut num_messages = 0;
240 let max_chunk_size = self.source_ctx.source_ctrl_opts.chunk_size;
241 let mut res = Vec::with_capacity(max_chunk_size);
242 let require_message_header = self.parser_config.common.rw_columns.iter().any(|col_desc| {
244 matches!(
245 col_desc.additional_column.column_type,
246 Some(AdditionalColumnType::Headers(_) | AdditionalColumnType::HeaderInner(_))
247 )
248 });
249
250 let mut latest_message_id_metrics: HashMap<String, LabelGuardedIntGauge<3>> =
251 HashMap::new();
252
253 #[for_await]
254 'for_outer_loop: for msgs in self.consumer.stream().ready_chunks(max_chunk_size) {
255 let msgs: Vec<_> = msgs
256 .into_iter()
257 .collect::<std::result::Result<_, KafkaError>>()?;
258
259 let mut split_msg_offsets = HashMap::new();
260
261 for msg in &msgs {
262 split_msg_offsets.insert(msg.partition(), msg.offset());
263 }
264
265 for (partition, offset) in split_msg_offsets {
266 let split_id = partition.to_string();
267 latest_message_id_metrics
268 .entry(split_id.clone())
269 .or_insert_with(|| {
270 self.source_ctx
271 .metrics
272 .latest_message_id
273 .with_guarded_label_values(&[
274 &self.source_ctx.source_id.to_string(),
276 &self.source_ctx.actor_id.to_string(),
277 &split_id,
278 ])
279 })
280 .set(offset);
281 }
282
283 for msg in msgs {
284 let cur_offset = msg.offset();
285 bytes_current_second += match &msg.payload() {
286 None => 0,
287 Some(payload) => payload.len(),
288 };
289 num_messages += 1;
290 let source_message =
291 SourceMessage::from_kafka_message(&msg, require_message_header);
292 let split_id = source_message.split_id.clone();
293 res.push(source_message);
294
295 if let Entry::Occupied(o) = stop_offsets.entry(split_id) {
296 let stop_offset = *o.get();
297
298 if cur_offset == stop_offset - 1 {
299 tracing::debug!(
300 "stop offset reached for split {}, stop reading, offset: {}, stop offset: {}",
301 o.key(),
302 cur_offset,
303 stop_offset
304 );
305
306 o.remove();
307
308 if stop_offsets.is_empty() {
309 yield res;
310 break 'for_outer_loop;
311 }
312
313 continue;
314 }
315 }
316
317 if bytes_current_second > self.bytes_per_second {
318 let mut cur = Vec::with_capacity(res.capacity());
320 swap(&mut cur, &mut res);
321 yield cur;
322 interval.tick().await;
323 bytes_current_second = 0;
324 res.clear();
325 }
326 if num_messages >= self.max_num_messages {
327 yield res;
328 break 'for_outer_loop;
329 }
330 }
331 let mut cur = Vec::with_capacity(res.capacity());
332 swap(&mut cur, &mut res);
333 yield cur;
334 }
338 tracing::info!("kafka reader finished");
339 }
340}