risingwave_connector/source/kafka/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use serde::Deserialize;
18use serde_with::{DisplayFromStr, serde_as};
19
20use crate::connector_common::{AwsAuthProps, KafkaConnectionProps, KafkaPrivateLinkCommon};
21
22mod client_context;
23pub mod enumerator;
24pub mod private_link;
25pub mod source;
26pub mod split;
27pub mod stats;
28
29pub use client_context::*;
30pub use enumerator::*;
31pub use source::*;
32pub use split::*;
33use with_options::WithOptions;
34
35use crate::connector_common::{KafkaCommon, RdKafkaPropertiesCommon};
36use crate::source::SourceProperties;
37
38pub const KAFKA_CONNECTOR: &str = "kafka";
39pub const KAFKA_PROPS_BROKER_KEY: &str = "properties.bootstrap.server";
40pub const KAFKA_PROPS_BROKER_KEY_ALIAS: &str = "kafka.brokers";
41pub const PRIVATELINK_CONNECTION: &str = "privatelink";
42
43/// Properties for the rdkafka library. Leave a field as `None` to use the default value.
44/// These properties are not intended to be exposed to users in the majority of cases.
45///
46/// See also <https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md>
47#[serde_as]
48#[derive(Clone, Debug, Deserialize, Default, WithOptions)]
49pub struct RdKafkaPropertiesConsumer {
50    /// Minimum number of messages per topic+partition librdkafka tries to maintain in the local
51    /// consumer queue.
52    #[serde(rename = "properties.queued.min.messages")]
53    #[serde_as(as = "Option<DisplayFromStr>")]
54    pub queued_min_messages: Option<usize>,
55
56    #[serde(rename = "properties.queued.max.messages.kbytes")]
57    #[serde_as(as = "Option<DisplayFromStr>")]
58    pub queued_max_messages_kbytes: Option<usize>,
59
60    /// Maximum time the broker may wait to fill the Fetch response with `fetch.min.`bytes of
61    /// messages.
62    #[serde(rename = "properties.fetch.wait.max.ms")]
63    #[serde_as(as = "Option<DisplayFromStr>")]
64    pub fetch_wait_max_ms: Option<usize>,
65
66    /// Minimum number of bytes the broker responds with. If fetch.wait.max.ms expires the accumulated data will be sent to the client regardless of this setting.
67    #[serde(rename = "properties.fetch.min.bytes")]
68    #[serde_as(as = "Option<DisplayFromStr>")]
69    pub fetch_min_bytes: Option<usize>,
70
71    /// Initial maximum number of bytes per topic+partition to request when fetching messages from the broker. If the client encounters a message larger than this value it will gradually try to increase it until the entire message can be fetched.
72    #[serde(rename = "properties.fetch.message.max.bytes")]
73    #[serde_as(as = "Option<DisplayFromStr>")]
74    pub fetch_message_max_bytes: Option<usize>,
75
76    /// How long to postpone the next fetch request for a topic+partition in case the current fetch
77    /// queue thresholds (`queued.min.messages` or `queued.max.messages.kbytes`) have been
78    /// exceeded. This property may need to be decreased if the queue thresholds are set low
79    /// and the application is experiencing long (~1s) delays between messages. Low values may
80    /// increase CPU utilization.
81    #[serde(rename = "properties.fetch.queue.backoff.ms")]
82    #[serde_as(as = "Option<DisplayFromStr>")]
83    pub fetch_queue_backoff_ms: Option<usize>,
84
85    /// Maximum amount of data the broker shall return for a Fetch request. Messages are fetched in
86    /// batches by the consumer and if the first message batch in the first non-empty partition of
87    /// the Fetch request is larger than this value, then the message batch will still be returned
88    /// to ensure the consumer can make progress. The maximum message batch size accepted by the
89    /// broker is defined via `message.max.bytes` (broker config) or `max.message.bytes` (broker
90    /// topic config). `fetch.max.bytes` is automatically adjusted upwards to be at least
91    /// `message.max.bytes` (consumer config).
92    #[serde(rename = "properties.fetch.max.bytes")]
93    #[serde_as(as = "Option<DisplayFromStr>")]
94    pub fetch_max_bytes: Option<usize>,
95
96    /// Whether to automatically and periodically commit offsets in the background.
97    ///
98    /// Note that RisingWave does NOT rely on committed offsets. Committing offset is only for exposing the
99    /// progress for monitoring. Setting this to false can avoid creating consumer groups.
100    ///
101    /// default: true
102    #[serde(rename = "properties.enable.auto.commit")]
103    #[serde_as(as = "Option<DisplayFromStr>")]
104    pub enable_auto_commit: Option<bool>,
105}
106
107#[derive(Clone, Debug, Deserialize, WithOptions)]
108pub struct KafkaProperties {
109    /// This parameter is not intended to be exposed to users.
110    /// This parameter specifies only for one parallelism. The parallelism of kafka source
111    /// is equal to the parallelism passed into compute nodes. So users need to calculate
112    /// how many bytes will be consumed in total across all the parallelism by themselves.
113    #[serde(rename = "bytes.per.second", alias = "kafka.bytes.per.second")]
114    pub bytes_per_second: Option<String>,
115
116    /// This parameter is not intended to be exposed to users.
117    /// This parameter specifies only for one parallelism. The parallelism of kafka source
118    /// is equal to the parallelism passed into compute nodes. So users need to calculate
119    /// how many messages will be consumed in total across all the parallelism by themselves.
120    #[serde(rename = "max.num.messages", alias = "kafka.max.num.messages")]
121    pub max_num_messages: Option<String>,
122
123    #[serde(rename = "scan.startup.mode", alias = "kafka.scan.startup.mode")]
124    pub scan_startup_mode: Option<String>,
125
126    #[serde(
127        rename = "scan.startup.timestamp.millis",
128        alias = "kafka.time.offset",
129        alias = "scan.startup.timestamp_millis" // keep for compatibility
130    )]
131    pub time_offset: Option<String>,
132
133    /// Specify a custom consumer group id prefix for the source.
134    /// Defaults to `rw-consumer`.
135    ///
136    /// Notes:
137    /// - Each job (materialized view) will have a separated consumer group and
138    ///   contains a generated suffix in the group id.
139    ///   The consumer group will be `{group_id_prefix}-{fragment_id}`.
140    /// - The consumer group is solely for monintoring progress in some external
141    ///   Kafka tools, and for authorization. RisingWave does not rely on committed
142    ///   offsets, and does not join the consumer group. It just reports offsets
143    ///   to the group.
144    #[serde(rename = "group.id.prefix")]
145    pub group_id_prefix: Option<String>,
146
147    /// This parameter is used to tell `KafkaSplitReader` to produce `UpsertMessage`s, which
148    /// combine both key and value fields of the Kafka message.
149    /// TODO: Currently, `Option<bool>` can not be parsed here.
150    #[serde(rename = "upsert")]
151    pub upsert: Option<String>,
152
153    #[serde(flatten)]
154    pub common: KafkaCommon,
155
156    #[serde(flatten)]
157    pub connection: KafkaConnectionProps,
158
159    #[serde(flatten)]
160    pub rdkafka_properties_common: RdKafkaPropertiesCommon,
161
162    #[serde(flatten)]
163    pub rdkafka_properties_consumer: RdKafkaPropertiesConsumer,
164
165    #[serde(flatten)]
166    pub privatelink_common: KafkaPrivateLinkCommon,
167
168    #[serde(flatten)]
169    pub aws_auth_props: AwsAuthProps,
170
171    #[serde(flatten)]
172    pub unknown_fields: HashMap<String, String>,
173}
174
175impl SourceProperties for KafkaProperties {
176    type Split = KafkaSplit;
177    type SplitEnumerator = KafkaSplitEnumerator;
178    type SplitReader = KafkaSplitReader;
179
180    const SOURCE_NAME: &'static str = KAFKA_CONNECTOR;
181}
182
183impl crate::source::UnknownFields for KafkaProperties {
184    fn unknown_fields(&self) -> HashMap<String, String> {
185        self.unknown_fields.clone()
186    }
187}
188
189impl KafkaProperties {
190    pub fn set_client(&self, c: &mut rdkafka::ClientConfig) {
191        self.rdkafka_properties_common.set_client(c);
192        self.rdkafka_properties_consumer.set_client(c);
193    }
194
195    pub fn group_id(&self, fragment_id: u32) -> String {
196        format!(
197            "{}-{}",
198            self.group_id_prefix.as_deref().unwrap_or("rw-consumer"),
199            fragment_id
200        )
201    }
202}
203
204const KAFKA_ISOLATION_LEVEL: &str = "read_committed";
205
206impl RdKafkaPropertiesConsumer {
207    pub fn set_client(&self, c: &mut rdkafka::ClientConfig) {
208        if let Some(v) = &self.queued_min_messages {
209            c.set("queued.min.messages", v.to_string());
210        }
211        if let Some(v) = &self.queued_max_messages_kbytes {
212            c.set("queued.max.messages.kbytes", v.to_string());
213        }
214        if let Some(v) = &self.fetch_wait_max_ms {
215            c.set("fetch.wait.max.ms", v.to_string());
216        }
217        if let Some(v) = &self.fetch_min_bytes {
218            c.set("fetch.min.bytes", v.to_string());
219        }
220        if let Some(v) = &self.fetch_message_max_bytes {
221            c.set("fetch.message.max.bytes", v.to_string());
222        }
223        if let Some(v) = &self.fetch_queue_backoff_ms {
224            c.set("fetch.queue.backoff.ms", v.to_string());
225        }
226        if let Some(v) = &self.fetch_max_bytes {
227            c.set("fetch.max.bytes", v.to_string());
228        }
229        if let Some(v) = &self.enable_auto_commit {
230            c.set("enable.auto.commit", v.to_string());
231        }
232    }
233}
234
235#[cfg(test)]
236mod test {
237    use std::collections::BTreeMap;
238
239    use maplit::btreemap;
240
241    use super::*;
242
243    #[test]
244    fn test_parse_config_consumer_common() {
245        let config: BTreeMap<String, String> = btreemap! {
246            // common
247            "properties.bootstrap.server".to_owned() => "127.0.0.1:9092".to_owned(),
248            "topic".to_owned() => "test".to_owned(),
249            // kafka props
250            "scan.startup.mode".to_owned() => "earliest".to_owned(),
251            // RdKafkaPropertiesCommon
252            "properties.message.max.bytes".to_owned() => "12345".to_owned(),
253            "properties.receive.message.max.bytes".to_owned() => "54321".to_owned(),
254            // RdKafkaPropertiesConsumer
255            "properties.queued.min.messages".to_owned() => "114514".to_owned(),
256            "properties.queued.max.messages.kbytes".to_owned() => "114514".to_owned(),
257            "properties.fetch.wait.max.ms".to_owned() => "114514".to_owned(),
258            "properties.fetch.max.bytes".to_owned() => "114514".to_owned(),
259            "properties.enable.auto.commit".to_owned() => "true".to_owned(),
260            "properties.fetch.queue.backoff.ms".to_owned() => "114514".to_owned(),
261            // PrivateLink
262            "broker.rewrite.endpoints".to_owned() => "{\"broker1\": \"10.0.0.1:8001\"}".to_owned(),
263        };
264
265        let props: KafkaProperties =
266            serde_json::from_value(serde_json::to_value(config).unwrap()).unwrap();
267
268        assert_eq!(props.scan_startup_mode, Some("earliest".to_owned()));
269        assert_eq!(
270            props.rdkafka_properties_common.receive_message_max_bytes,
271            Some(54321)
272        );
273        assert_eq!(
274            props.rdkafka_properties_common.message_max_bytes,
275            Some(12345)
276        );
277        assert_eq!(
278            props.rdkafka_properties_consumer.queued_min_messages,
279            Some(114514)
280        );
281        assert_eq!(
282            props.rdkafka_properties_consumer.queued_max_messages_kbytes,
283            Some(114514)
284        );
285        assert_eq!(
286            props.rdkafka_properties_consumer.fetch_wait_max_ms,
287            Some(114514)
288        );
289        assert_eq!(
290            props.rdkafka_properties_consumer.fetch_max_bytes,
291            Some(114514)
292        );
293        assert_eq!(
294            props.rdkafka_properties_consumer.enable_auto_commit,
295            Some(true)
296        );
297        assert_eq!(
298            props.rdkafka_properties_consumer.fetch_queue_backoff_ms,
299            Some(114514)
300        );
301        let hashmap: BTreeMap<String, String> = btreemap! {
302            "broker1".to_owned() => "10.0.0.1:8001".to_owned()
303        };
304        assert_eq!(props.privatelink_common.broker_rewrite_map, Some(hashmap));
305    }
306}