risingwave_connector/source/kafka/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use serde::Deserialize;
18use serde_with::{DisplayFromStr, serde_as};
19
20use crate::connector_common::{AwsAuthProps, KafkaConnectionProps, KafkaPrivateLinkCommon};
21use crate::enforce_secret::EnforceSecret;
22use crate::error::ConnectorResult;
23
24mod client_context;
25pub mod enumerator;
26pub mod private_link;
27pub mod source;
28pub mod split;
29pub mod stats;
30
31pub use client_context::*;
32pub use enumerator::*;
33use risingwave_common::id::FragmentId;
34pub use source::*;
35pub use split::*;
36use with_options::WithOptions;
37
38use crate::connector_common::{KafkaCommon, RdKafkaPropertiesCommon};
39use crate::source::SourceProperties;
40
41pub const KAFKA_CONNECTOR: &str = "kafka";
42pub const KAFKA_PROPS_BROKER_KEY: &str = "properties.bootstrap.server";
43pub const KAFKA_PROPS_BROKER_KEY_ALIAS: &str = "kafka.brokers";
44pub const PRIVATELINK_CONNECTION: &str = "privatelink";
45
46/// Properties for the rdkafka library. Leave a field as `None` to use the default value.
47/// These properties are not intended to be exposed to users in the majority of cases.
48///
49/// See also <https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md>
50#[serde_as]
51#[derive(Clone, Debug, Deserialize, Default, WithOptions)]
52pub struct RdKafkaPropertiesConsumer {
53    /// Minimum number of messages per topic+partition librdkafka tries to maintain in the local
54    /// consumer queue.
55    #[serde(rename = "properties.queued.min.messages")]
56    #[serde_as(as = "Option<DisplayFromStr>")]
57    #[with_option(allow_alter_on_fly)]
58    pub queued_min_messages: Option<usize>,
59
60    #[serde(rename = "properties.queued.max.messages.kbytes")]
61    #[serde_as(as = "Option<DisplayFromStr>")]
62    #[with_option(allow_alter_on_fly)]
63    pub queued_max_messages_kbytes: Option<usize>,
64
65    /// Maximum time the broker may wait to fill the Fetch response with `fetch.min.`bytes of
66    /// messages.
67    #[serde(rename = "properties.fetch.wait.max.ms")]
68    #[serde_as(as = "Option<DisplayFromStr>")]
69    #[with_option(allow_alter_on_fly)]
70    pub fetch_wait_max_ms: Option<usize>,
71
72    /// Minimum number of bytes the broker responds with. If fetch.wait.max.ms expires the accumulated data will be sent to the client regardless of this setting.
73    #[serde(rename = "properties.fetch.min.bytes")]
74    #[serde_as(as = "Option<DisplayFromStr>")]
75    pub fetch_min_bytes: Option<usize>,
76
77    /// Initial maximum number of bytes per topic+partition to request when fetching messages from the broker. If the client encounters a message larger than this value it will gradually try to increase it until the entire message can be fetched.
78    #[serde(rename = "properties.fetch.message.max.bytes")]
79    #[serde_as(as = "Option<DisplayFromStr>")]
80    pub fetch_message_max_bytes: Option<usize>,
81
82    /// How long to postpone the next fetch request for a topic+partition in case the current fetch
83    /// queue thresholds (`queued.min.messages` or `queued.max.messages.kbytes`) have been
84    /// exceeded. This property may need to be decreased if the queue thresholds are set low
85    /// and the application is experiencing long (~1s) delays between messages. Low values may
86    /// increase CPU utilization.
87    #[serde(rename = "properties.fetch.queue.backoff.ms")]
88    #[serde_as(as = "Option<DisplayFromStr>")]
89    #[with_option(allow_alter_on_fly)]
90    pub fetch_queue_backoff_ms: Option<usize>,
91
92    /// Maximum amount of data the broker shall return for a Fetch request. Messages are fetched in
93    /// batches by the consumer and if the first message batch in the first non-empty partition of
94    /// the Fetch request is larger than this value, then the message batch will still be returned
95    /// to ensure the consumer can make progress. The maximum message batch size accepted by the
96    /// broker is defined via `message.max.bytes` (broker config) or `max.message.bytes` (broker
97    /// topic config). `fetch.max.bytes` is automatically adjusted upwards to be at least
98    /// `message.max.bytes` (consumer config).
99    #[serde(rename = "properties.fetch.max.bytes")]
100    #[serde_as(as = "Option<DisplayFromStr>")]
101    #[with_option(allow_alter_on_fly)]
102    pub fetch_max_bytes: Option<usize>,
103
104    /// Whether to automatically and periodically commit offsets in the background.
105    ///
106    /// Note that RisingWave does NOT rely on committed offsets. Committing offset is only for exposing the
107    /// progress for monitoring. Setting this to false can avoid creating consumer groups.
108    ///
109    /// default: true
110    #[serde(rename = "properties.enable.auto.commit")]
111    #[serde_as(as = "Option<DisplayFromStr>")]
112    #[with_option(allow_alter_on_fly)]
113    pub enable_auto_commit: Option<bool>,
114}
115
116#[derive(Clone, Debug, Deserialize, WithOptions)]
117pub struct KafkaProperties {
118    /// This parameter is not intended to be exposed to users.
119    /// This parameter specifies only for one parallelism. The parallelism of kafka source
120    /// is equal to the parallelism passed into compute nodes. So users need to calculate
121    /// how many bytes will be consumed in total across all the parallelism by themselves.
122    #[serde(rename = "bytes.per.second", alias = "kafka.bytes.per.second")]
123    pub bytes_per_second: Option<String>,
124
125    /// This parameter is not intended to be exposed to users.
126    /// This parameter specifies only for one parallelism. The parallelism of kafka source
127    /// is equal to the parallelism passed into compute nodes. So users need to calculate
128    /// how many messages will be consumed in total across all the parallelism by themselves.
129    #[serde(rename = "max.num.messages", alias = "kafka.max.num.messages")]
130    pub max_num_messages: Option<String>,
131
132    #[serde(rename = "scan.startup.mode", alias = "kafka.scan.startup.mode")]
133    pub scan_startup_mode: Option<String>,
134
135    #[serde(
136        rename = "scan.startup.timestamp.millis",
137        alias = "kafka.time.offset",
138        alias = "scan.startup.timestamp_millis" // keep for compatibility
139    )]
140    pub time_offset: Option<String>,
141
142    /// Specify a custom consumer group id prefix for the source.
143    /// Defaults to `rw-consumer`.
144    ///
145    /// Notes:
146    /// - Each job (materialized view) will have a separated consumer group and
147    ///   contains a generated suffix in the group id.
148    ///   The consumer group will be `{group_id_prefix}-{fragment_id}`.
149    /// - The consumer group is solely for monintoring progress in some external
150    ///   Kafka tools, and for authorization. RisingWave does not rely on committed
151    ///   offsets, and does not join the consumer group. It just reports offsets
152    ///   to the group.
153    #[serde(rename = "group.id.prefix")]
154    #[with_option(allow_alter_on_fly)]
155    pub group_id_prefix: Option<String>,
156
157    /// This parameter is used to tell `KafkaSplitReader` to produce `UpsertMessage`s, which
158    /// combine both key and value fields of the Kafka message.
159    /// TODO: Currently, `Option<bool>` can not be parsed here.
160    #[serde(rename = "upsert")]
161    pub upsert: Option<String>,
162
163    #[serde(flatten)]
164    pub common: KafkaCommon,
165
166    #[serde(flatten)]
167    pub connection: KafkaConnectionProps,
168
169    #[serde(flatten)]
170    pub rdkafka_properties_common: RdKafkaPropertiesCommon,
171
172    #[serde(flatten)]
173    pub rdkafka_properties_consumer: RdKafkaPropertiesConsumer,
174
175    #[serde(flatten)]
176    pub privatelink_common: KafkaPrivateLinkCommon,
177
178    #[serde(flatten)]
179    pub aws_auth_props: AwsAuthProps,
180
181    #[serde(flatten)]
182    pub unknown_fields: HashMap<String, String>,
183}
184
185impl EnforceSecret for KafkaProperties {
186    fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
187        for prop in prop_iter {
188            KafkaConnectionProps::enforce_one(prop)?;
189            AwsAuthProps::enforce_one(prop)?;
190        }
191        Ok(())
192    }
193}
194
195impl SourceProperties for KafkaProperties {
196    type Split = KafkaSplit;
197    type SplitEnumerator = KafkaSplitEnumerator;
198    type SplitReader = KafkaSplitReader;
199
200    const SOURCE_NAME: &'static str = KAFKA_CONNECTOR;
201}
202
203impl crate::source::UnknownFields for KafkaProperties {
204    fn unknown_fields(&self) -> HashMap<String, String> {
205        self.unknown_fields.clone()
206    }
207}
208
209impl KafkaProperties {
210    pub fn set_client(&self, c: &mut rdkafka::ClientConfig) {
211        self.rdkafka_properties_common.set_client(c);
212        self.rdkafka_properties_consumer.set_client(c);
213    }
214
215    pub fn group_id(&self, fragment_id: FragmentId) -> String {
216        format!(
217            "{}-{}",
218            self.group_id_prefix.as_deref().unwrap_or("rw-consumer"),
219            fragment_id
220        )
221    }
222}
223
224const KAFKA_ISOLATION_LEVEL: &str = "read_committed";
225
226impl RdKafkaPropertiesConsumer {
227    pub fn set_client(&self, c: &mut rdkafka::ClientConfig) {
228        if let Some(v) = &self.queued_min_messages {
229            c.set("queued.min.messages", v.to_string());
230        }
231        if let Some(v) = &self.queued_max_messages_kbytes {
232            c.set("queued.max.messages.kbytes", v.to_string());
233        }
234        if let Some(v) = &self.fetch_wait_max_ms {
235            c.set("fetch.wait.max.ms", v.to_string());
236        }
237        if let Some(v) = &self.fetch_min_bytes {
238            c.set("fetch.min.bytes", v.to_string());
239        }
240        if let Some(v) = &self.fetch_message_max_bytes {
241            c.set("fetch.message.max.bytes", v.to_string());
242        }
243        if let Some(v) = &self.fetch_queue_backoff_ms {
244            c.set("fetch.queue.backoff.ms", v.to_string());
245        }
246        if let Some(v) = &self.fetch_max_bytes {
247            c.set("fetch.max.bytes", v.to_string());
248        }
249        if let Some(v) = &self.enable_auto_commit {
250            c.set("enable.auto.commit", v.to_string());
251        }
252    }
253}
254
255#[cfg(test)]
256mod test {
257    use std::collections::BTreeMap;
258
259    use maplit::btreemap;
260
261    use super::*;
262
263    #[test]
264    fn test_parse_config_consumer_common() {
265        let config: BTreeMap<String, String> = btreemap! {
266            // common
267            "properties.bootstrap.server".to_owned() => "127.0.0.1:9092".to_owned(),
268            "topic".to_owned() => "test".to_owned(),
269            // kafka props
270            "scan.startup.mode".to_owned() => "earliest".to_owned(),
271            // RdKafkaPropertiesCommon
272            "properties.message.max.bytes".to_owned() => "12345".to_owned(),
273            "properties.receive.message.max.bytes".to_owned() => "54321".to_owned(),
274            // RdKafkaPropertiesConsumer
275            "properties.queued.min.messages".to_owned() => "114514".to_owned(),
276            "properties.queued.max.messages.kbytes".to_owned() => "114514".to_owned(),
277            "properties.fetch.wait.max.ms".to_owned() => "114514".to_owned(),
278            "properties.fetch.max.bytes".to_owned() => "114514".to_owned(),
279            "properties.enable.auto.commit".to_owned() => "true".to_owned(),
280            "properties.fetch.queue.backoff.ms".to_owned() => "114514".to_owned(),
281            // PrivateLink
282            "broker.rewrite.endpoints".to_owned() => "{\"broker1\": \"10.0.0.1:8001\"}".to_owned(),
283        };
284
285        let props: KafkaProperties =
286            serde_json::from_value(serde_json::to_value(config).unwrap()).unwrap();
287
288        assert_eq!(props.scan_startup_mode, Some("earliest".to_owned()));
289        assert_eq!(
290            props.rdkafka_properties_common.receive_message_max_bytes,
291            Some(54321)
292        );
293        assert_eq!(
294            props.rdkafka_properties_common.message_max_bytes,
295            Some(12345)
296        );
297        assert_eq!(
298            props.rdkafka_properties_consumer.queued_min_messages,
299            Some(114514)
300        );
301        assert_eq!(
302            props.rdkafka_properties_consumer.queued_max_messages_kbytes,
303            Some(114514)
304        );
305        assert_eq!(
306            props.rdkafka_properties_consumer.fetch_wait_max_ms,
307            Some(114514)
308        );
309        assert_eq!(
310            props.rdkafka_properties_consumer.fetch_max_bytes,
311            Some(114514)
312        );
313        assert_eq!(
314            props.rdkafka_properties_consumer.enable_auto_commit,
315            Some(true)
316        );
317        assert_eq!(
318            props.rdkafka_properties_consumer.fetch_queue_backoff_ms,
319            Some(114514)
320        );
321        let hashmap: BTreeMap<String, String> = btreemap! {
322            "broker1".to_owned() => "10.0.0.1:8001".to_owned()
323        };
324        assert_eq!(props.privatelink_common.broker_rewrite_map, Some(hashmap));
325    }
326}