risingwave_connector/sink/
nats.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use core::fmt::Debug;
15use core::future::IntoFuture;
16use std::collections::BTreeMap;
17
18use anyhow::{Context as _, anyhow};
19use async_nats::jetstream::context::Context;
20use futures::FutureExt;
21use futures::prelude::TryFuture;
22use risingwave_common::array::StreamChunk;
23use risingwave_common::catalog::Schema;
24use serde_derive::Deserialize;
25use serde_with::serde_as;
26use tokio_retry::Retry;
27use tokio_retry::strategy::{ExponentialBackoff, jitter};
28use with_options::WithOptions;
29
30use super::encoder::{
31    DateHandlingMode, JsonbHandlingMode, TimeHandlingMode, TimestamptzHandlingMode,
32};
33use super::utils::chunk_to_json;
34use super::{DummySinkCommitCoordinator, SinkWriterParam};
35use crate::connector_common::NatsCommon;
36use crate::enforce_secret::EnforceSecret;
37use crate::sink::encoder::{JsonEncoder, TimestampHandlingMode};
38use crate::sink::log_store::DeliveryFutureManagerAddFuture;
39use crate::sink::writer::{
40    AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt,
41};
42use crate::sink::{Result, SINK_TYPE_APPEND_ONLY, Sink, SinkError, SinkParam};
43
44pub const NATS_SINK: &str = "nats";
45const NATS_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
46
47#[serde_as]
48#[derive(Clone, Debug, Deserialize, WithOptions)]
49pub struct NatsConfig {
50    #[serde(flatten)]
51    pub common: NatsCommon,
52    // accept "append-only"
53    pub r#type: String,
54}
55
56#[derive(Clone, Debug)]
57pub struct NatsSink {
58    pub config: NatsConfig,
59    schema: Schema,
60    is_append_only: bool,
61}
62
63impl EnforceSecret for NatsSink {
64    fn enforce_secret<'a>(
65        prop_iter: impl Iterator<Item = &'a str>,
66    ) -> crate::error::ConnectorResult<()> {
67        for prop in prop_iter {
68            NatsCommon::enforce_one(prop)?;
69        }
70        Ok(())
71    }
72}
73
74// sink write
75pub struct NatsSinkWriter {
76    pub config: NatsConfig,
77    context: Context,
78    #[expect(dead_code)]
79    schema: Schema,
80    json_encoder: JsonEncoder,
81}
82
83pub type NatsSinkDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
84
85/// Basic data types for use with the nats interface
86impl NatsConfig {
87    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
88        let config = serde_json::from_value::<NatsConfig>(serde_json::to_value(values).unwrap())
89            .map_err(|e| SinkError::Config(anyhow!(e)))?;
90        if config.r#type != SINK_TYPE_APPEND_ONLY {
91            Err(SinkError::Config(anyhow!(
92                "NATS sink only supports append-only mode"
93            )))
94        } else {
95            Ok(config)
96        }
97    }
98}
99
100impl TryFrom<SinkParam> for NatsSink {
101    type Error = SinkError;
102
103    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
104        let schema = param.schema();
105        let config = NatsConfig::from_btreemap(param.properties)?;
106        Ok(Self {
107            config,
108            schema,
109            is_append_only: param.sink_type.is_append_only(),
110        })
111    }
112}
113
114impl Sink for NatsSink {
115    type Coordinator = DummySinkCommitCoordinator;
116    type LogSinker = AsyncTruncateLogSinkerOf<NatsSinkWriter>;
117
118    const SINK_NAME: &'static str = NATS_SINK;
119
120    async fn validate(&self) -> Result<()> {
121        if !self.is_append_only {
122            return Err(SinkError::Nats(anyhow!(
123                "NATS sink only supports append-only mode"
124            )));
125        }
126        let _client = (self.config.common.build_client().await)
127            .context("validate nats sink error")
128            .map_err(SinkError::Nats)?;
129        Ok(())
130    }
131
132    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
133        Ok(
134            NatsSinkWriter::new(self.config.clone(), self.schema.clone())
135                .await?
136                .into_log_sinker(NATS_SEND_FUTURE_BUFFER_MAX_SIZE),
137        )
138    }
139}
140
141impl NatsSinkWriter {
142    pub async fn new(config: NatsConfig, schema: Schema) -> Result<Self> {
143        let context = config
144            .common
145            .build_context()
146            .await
147            .map_err(|e| SinkError::Nats(anyhow!(e)))?;
148        Ok::<_, SinkError>(Self {
149            config: config.clone(),
150            context,
151            schema: schema.clone(),
152            json_encoder: JsonEncoder::new(
153                schema,
154                None,
155                DateHandlingMode::FromCe,
156                TimestampHandlingMode::Milli,
157                TimestamptzHandlingMode::UtcWithoutSuffix,
158                TimeHandlingMode::Milli,
159                JsonbHandlingMode::String,
160            ),
161        })
162    }
163}
164
165impl AsyncTruncateSinkWriter for NatsSinkWriter {
166    type DeliveryFuture = NatsSinkDeliveryFuture;
167
168    async fn write_chunk<'a>(
169        &'a mut self,
170        chunk: StreamChunk,
171        mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
172    ) -> Result<()> {
173        let mut data = chunk_to_json(chunk, &self.json_encoder)?;
174        for item in &mut data {
175            let publish_ack_future = Retry::spawn(
176                ExponentialBackoff::from_millis(100).map(jitter).take(3),
177                || async {
178                    self.context
179                        .publish(self.config.common.subject.clone(), item.clone().into())
180                        .await
181                        .context("nats sink error")
182                        .map_err(SinkError::Nats)
183                },
184            )
185            .await
186            .context("nats sink error")
187            .map_err(SinkError::Nats)?;
188            let future = publish_ack_future.into_future().map(|result| {
189                result
190                    .context("Nats sink error")
191                    .map_err(SinkError::Nats)
192                    .map(|_| ())
193            });
194            add_future.add_future_may_await(future).await?;
195        }
196        Ok(())
197    }
198}