risingwave_connector/source/nats/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod enumerator;
16pub use enumerator::NatsSplitEnumerator;
17pub mod source;
18pub mod split;
19
20use std::collections::HashMap;
21use std::fmt::Display;
22use std::time::Duration;
23
24use async_nats::jetstream::consumer::pull::Config;
25use async_nats::jetstream::consumer::{AckPolicy, ReplayPolicy};
26use serde::Deserialize;
27use serde_with::{DisplayFromStr, serde_as};
28use thiserror::Error;
29use with_options::WithOptions;
30
31use crate::connector_common::NatsCommon;
32use crate::enforce_secret::EnforceSecret;
33use crate::error::{ConnectorError, ConnectorResult};
34use crate::source::SourceProperties;
35use crate::source::nats::source::{NatsSplit, NatsSplitReader};
36use crate::{
37    deserialize_optional_string_seq_from_string, deserialize_optional_u64_seq_from_string,
38};
39
40#[derive(Debug, Clone, Error)]
41pub struct NatsJetStreamError(String);
42
43impl Display for NatsJetStreamError {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        write!(f, "{}", self.0)
46    }
47}
48
49pub const NATS_CONNECTOR: &str = "nats";
50
51pub struct AckPolicyWrapper;
52
53impl AckPolicyWrapper {
54    pub fn parse_str(s: &str) -> Result<AckPolicy, NatsJetStreamError> {
55        match s {
56            "none" => Ok(AckPolicy::None),
57            "all" => Ok(AckPolicy::All),
58            "explicit" => Ok(AckPolicy::Explicit),
59            _ => Err(NatsJetStreamError(format!(
60                "Invalid AckPolicy '{}', expect `none`, `all`, and `explicit`",
61                s
62            ))),
63        }
64    }
65}
66
67pub struct ReplayPolicyWrapper;
68
69impl ReplayPolicyWrapper {
70    pub fn parse_str(s: &str) -> Result<ReplayPolicy, NatsJetStreamError> {
71        match s {
72            "instant" => Ok(ReplayPolicy::Instant),
73            "original" => Ok(ReplayPolicy::Original),
74            _ => Err(NatsJetStreamError(format!(
75                "Invalid ReplayPolicy '{}', expect `instant` and `original`",
76                s
77            ))),
78        }
79    }
80}
81
82#[serde_as]
83#[derive(Clone, Debug, Deserialize, WithOptions)]
84pub struct NatsProperties {
85    #[serde(flatten)]
86    pub common: NatsCommon,
87
88    #[serde(flatten)]
89    pub nats_properties_consumer: NatsPropertiesConsumer,
90
91    #[serde(rename = "scan.startup.mode")]
92    pub scan_startup_mode: Option<String>,
93
94    #[serde(
95        rename = "scan.startup.timestamp.millis",
96        alias = "scan.startup.timestamp_millis"
97    )]
98    #[serde_as(as = "Option<DisplayFromStr>")]
99    pub start_timestamp_millis: Option<i64>,
100
101    #[serde(rename = "stream")]
102    pub stream: String,
103
104    #[serde(rename = "consumer.durable_name")]
105    pub durable_consumer_name: String,
106
107    #[serde(flatten)]
108    pub unknown_fields: HashMap<String, String>,
109}
110
111impl EnforceSecret for NatsProperties {
112    fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
113        for prop in prop_iter {
114            NatsCommon::enforce_one(prop)?;
115        }
116        Ok(())
117    }
118}
119
120impl NatsProperties {
121    pub fn set_config(&self, c: &mut Config) {
122        self.nats_properties_consumer.set_config(c);
123    }
124}
125
126/// Properties for the async-nats library.
127/// See <https://docs.rs/async-nats/latest/async_nats/jetstream/consumer/struct.Config.html>
128#[serde_as]
129#[derive(Clone, Debug, Deserialize, WithOptions)]
130pub struct NatsPropertiesConsumer {
131    #[serde(rename = "consumer.deliver_subject")]
132    pub deliver_subject: Option<String>,
133
134    #[serde(rename = "consumer.name")]
135    pub name: Option<String>,
136
137    #[serde(rename = "consumer.description")]
138    pub description: Option<String>,
139
140    #[serde(rename = "consumer.deliver_policy")]
141    #[serde_as(as = "Option<DisplayFromStr>")]
142    pub deliver_policy: Option<String>,
143
144    #[serde(rename = "consumer.ack_policy")]
145    #[serde_as(as = "Option<DisplayFromStr>")]
146    pub ack_policy: Option<String>,
147
148    #[serde(rename = "consumer.ack_wait.sec")]
149    #[serde_as(as = "Option<DisplayFromStr>")]
150    pub ack_wait: Option<u64>,
151
152    #[serde(rename = "consumer.max_deliver")]
153    #[serde_as(as = "Option<DisplayFromStr>")]
154    pub max_deliver: Option<i64>,
155
156    #[serde(rename = "consumer.filter_subject")]
157    pub filter_subject: Option<String>,
158
159    #[serde(
160        rename = "consumer.filter_subjects",
161        default,
162        deserialize_with = "deserialize_optional_string_seq_from_string"
163    )]
164    pub filter_subjects: Option<Vec<String>>,
165
166    #[serde(rename = "consumer.replay_policy")]
167    #[serde_as(as = "Option<DisplayFromStr>")]
168    pub replay_policy: Option<String>,
169
170    #[serde(rename = "consumer.rate_limit")]
171    #[serde_as(as = "Option<DisplayFromStr>")]
172    pub rate_limit: Option<u64>,
173
174    #[serde(rename = "consumer.sample_frequency")]
175    #[serde_as(as = "Option<DisplayFromStr>")]
176    pub sample_frequency: Option<u8>,
177
178    #[serde(rename = "consumer.max_waiting")]
179    #[serde_as(as = "Option<DisplayFromStr>")]
180    pub max_waiting: Option<i64>,
181
182    #[serde(rename = "consumer.max_ack_pending")]
183    #[serde_as(as = "Option<DisplayFromStr>")]
184    pub max_ack_pending: Option<i64>,
185
186    #[serde(rename = "consumer.headers_only")]
187    #[serde_as(as = "Option<DisplayFromStr>")]
188    pub headers_only: Option<bool>,
189
190    #[serde(rename = "consumer.max_batch")]
191    #[serde_as(as = "Option<DisplayFromStr>")]
192    pub max_batch: Option<i64>,
193
194    #[serde(rename = "consumer.max_bytes")]
195    #[serde_as(as = "Option<DisplayFromStr>")]
196    pub max_bytes: Option<i64>,
197
198    #[serde(rename = "consumer.max_expires.sec")]
199    #[serde_as(as = "Option<DisplayFromStr>")]
200    pub max_expires: Option<u64>,
201
202    #[serde(rename = "consumer.inactive_threshold.sec")]
203    #[serde_as(as = "Option<DisplayFromStr>")]
204    pub inactive_threshold: Option<u64>,
205
206    #[serde(rename = "consumer.num.replicas", alias = "consumer.num_replicas")]
207    #[serde_as(as = "Option<DisplayFromStr>")]
208    pub num_replicas: Option<usize>,
209
210    #[serde(rename = "consumer.memory_storage")]
211    #[serde_as(as = "Option<DisplayFromStr>")]
212    pub memory_storage: Option<bool>,
213
214    #[serde(
215        rename = "consumer.backoff.sec",
216        default,
217        deserialize_with = "deserialize_optional_u64_seq_from_string"
218    )]
219    pub backoff: Option<Vec<u64>>,
220}
221
222impl NatsPropertiesConsumer {
223    pub fn set_config(&self, c: &mut Config) {
224        if let Some(v) = &self.name {
225            c.name = Some(v.clone())
226        }
227        if let Some(v) = &self.description {
228            c.description = Some(v.clone())
229        }
230        if let Some(v) = &self.ack_policy {
231            c.ack_policy = AckPolicyWrapper::parse_str(v).unwrap()
232        }
233        if let Some(v) = &self.ack_wait {
234            c.ack_wait = Duration::from_secs(*v)
235        }
236        if let Some(v) = &self.max_deliver {
237            c.max_deliver = *v
238        }
239        if let Some(v) = &self.filter_subject {
240            c.filter_subject = v.clone()
241        }
242        if let Some(v) = &self.filter_subjects {
243            c.filter_subjects = v.clone()
244        }
245        if let Some(v) = &self.replay_policy {
246            c.replay_policy = ReplayPolicyWrapper::parse_str(v).unwrap()
247        }
248        if let Some(v) = &self.rate_limit {
249            c.rate_limit = *v
250        }
251        if let Some(v) = &self.sample_frequency {
252            c.sample_frequency = *v
253        }
254        if let Some(v) = &self.max_waiting {
255            c.max_waiting = *v
256        }
257        if let Some(v) = &self.max_ack_pending {
258            c.max_ack_pending = *v
259        }
260        if let Some(v) = &self.headers_only {
261            c.headers_only = *v
262        }
263        if let Some(v) = &self.max_batch {
264            c.max_batch = *v
265        }
266        if let Some(v) = &self.max_bytes {
267            c.max_bytes = *v
268        }
269        if let Some(v) = &self.max_expires {
270            c.max_expires = Duration::from_secs(*v)
271        }
272        if let Some(v) = &self.inactive_threshold {
273            c.inactive_threshold = Duration::from_secs(*v)
274        }
275        if let Some(v) = &self.num_replicas {
276            c.num_replicas = *v
277        }
278        if let Some(v) = &self.memory_storage {
279            c.memory_storage = *v
280        }
281        if let Some(v) = &self.backoff {
282            c.backoff = v.iter().map(|&x| Duration::from_secs(x)).collect()
283        }
284    }
285
286    pub fn get_ack_policy(&self) -> ConnectorResult<AckPolicy> {
287        match &self.ack_policy {
288            Some(policy) => Ok(AckPolicyWrapper::parse_str(policy).map_err(ConnectorError::from)?),
289            None => Ok(AckPolicy::None),
290        }
291    }
292}
293
294impl SourceProperties for NatsProperties {
295    type Split = NatsSplit;
296    type SplitEnumerator = NatsSplitEnumerator;
297    type SplitReader = NatsSplitReader;
298
299    const SOURCE_NAME: &'static str = NATS_CONNECTOR;
300}
301
302impl crate::source::UnknownFields for NatsProperties {
303    fn unknown_fields(&self) -> HashMap<String, String> {
304        self.unknown_fields.clone()
305    }
306}
307
308#[cfg(test)]
309mod test {
310    use std::collections::BTreeMap;
311
312    use maplit::btreemap;
313
314    use super::*;
315
316    #[test]
317    fn test_parse_config_consumer() {
318        let config: BTreeMap<String, String> = btreemap! {
319            "stream".to_owned() => "risingwave".to_owned(),
320
321            // NATS common
322            "subject".to_owned() => "subject1".to_owned(),
323            "server_url".to_owned() => "nats-server:4222".to_owned(),
324            "connect_mode".to_owned() => "plain".to_owned(),
325            "type".to_owned() => "append-only".to_owned(),
326
327            // NATS properties consumer
328            "consumer.name".to_owned() => "foobar".to_owned(),
329            "consumer.durable_name".to_owned() => "durable_foobar".to_owned(),
330            "consumer.description".to_owned() => "A description".to_owned(),
331            "consumer.ack_policy".to_owned() => "all".to_owned(),
332            "consumer.ack_wait.sec".to_owned() => "10".to_owned(),
333            "consumer.max_deliver".to_owned() => "10".to_owned(),
334            "consumer.filter_subject".to_owned() => "subject".to_owned(),
335            "consumer.filter_subjects".to_owned() => "subject1,subject2".to_owned(),
336            "consumer.replay_policy".to_owned() => "instant".to_owned(),
337            "consumer.rate_limit".to_owned() => "100".to_owned(),
338            "consumer.sample_frequency".to_owned() => "1".to_owned(),
339            "consumer.max_waiting".to_owned() => "5".to_owned(),
340            "consumer.max_ack_pending".to_owned() => "100".to_owned(),
341            "consumer.headers_only".to_owned() => "true".to_owned(),
342            "consumer.max_batch".to_owned() => "10".to_owned(),
343            "consumer.max_bytes".to_owned() => "1024".to_owned(),
344            "consumer.max_expires.sec".to_owned() => "24".to_owned(),
345            "consumer.inactive_threshold.sec".to_owned() => "10".to_owned(),
346            "consumer.num_replicas".to_owned() => "3".to_owned(),
347            "consumer.memory_storage".to_owned() => "true".to_owned(),
348            "consumer.backoff.sec".to_owned() => "2,10,15".to_owned(),
349            "durable_consumer_name".to_owned() => "test_durable_consumer".to_owned(),
350
351        };
352
353        let props: NatsProperties =
354            serde_json::from_value(serde_json::to_value(config).unwrap()).unwrap();
355
356        assert_eq!(
357            props.nats_properties_consumer.name,
358            Some("foobar".to_owned())
359        );
360        assert_eq!(props.durable_consumer_name, "durable_foobar".to_owned());
361        assert_eq!(
362            props.nats_properties_consumer.description,
363            Some("A description".to_owned())
364        );
365        assert_eq!(
366            props.nats_properties_consumer.ack_policy,
367            Some("all".to_owned())
368        );
369        assert_eq!(props.nats_properties_consumer.ack_wait, Some(10));
370        assert_eq!(
371            props.nats_properties_consumer.filter_subjects,
372            Some(vec!["subject1".to_owned(), "subject2".to_owned()])
373        );
374        assert_eq!(
375            props.nats_properties_consumer.replay_policy,
376            Some("instant".to_owned())
377        );
378        assert_eq!(props.nats_properties_consumer.rate_limit, Some(100));
379        assert_eq!(props.nats_properties_consumer.sample_frequency, Some(1));
380        assert_eq!(props.nats_properties_consumer.max_waiting, Some(5));
381        assert_eq!(props.nats_properties_consumer.max_ack_pending, Some(100));
382        assert_eq!(props.nats_properties_consumer.headers_only, Some(true));
383        assert_eq!(props.nats_properties_consumer.max_batch, Some(10));
384        assert_eq!(props.nats_properties_consumer.max_bytes, Some(1024));
385        assert_eq!(props.nats_properties_consumer.max_expires, Some(24));
386        assert_eq!(props.nats_properties_consumer.inactive_threshold, Some(10));
387        assert_eq!(props.nats_properties_consumer.num_replicas, Some(3));
388        assert_eq!(props.nats_properties_consumer.memory_storage, Some(true));
389        assert_eq!(
390            props.nats_properties_consumer.backoff,
391            Some(vec![2, 10, 15])
392        );
393    }
394}