risingwave_connector/sink/
nats.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use core::fmt::Debug;
15use core::future::IntoFuture;
16use std::collections::BTreeMap;
17
18use anyhow::{Context as _, anyhow};
19use async_nats::jetstream::context::Context;
20use futures::FutureExt;
21use futures::prelude::TryFuture;
22use risingwave_common::array::StreamChunk;
23use risingwave_common::catalog::Schema;
24use serde_derive::Deserialize;
25use serde_with::serde_as;
26use tokio_retry::Retry;
27use tokio_retry::strategy::{ExponentialBackoff, jitter};
28use with_options::WithOptions;
29
30use super::encoder::{
31    DateHandlingMode, JsonbHandlingMode, TimeHandlingMode, TimestamptzHandlingMode,
32};
33use super::utils::chunk_to_json;
34use super::{DummySinkCommitCoordinator, SinkWriterParam};
35use crate::connector_common::NatsCommon;
36use crate::sink::encoder::{JsonEncoder, TimestampHandlingMode};
37use crate::sink::log_store::DeliveryFutureManagerAddFuture;
38use crate::sink::writer::{
39    AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt,
40};
41use crate::sink::{Result, SINK_TYPE_APPEND_ONLY, Sink, SinkError, SinkParam};
42
43pub const NATS_SINK: &str = "nats";
44const NATS_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
45
46#[serde_as]
47#[derive(Clone, Debug, Deserialize, WithOptions)]
48pub struct NatsConfig {
49    #[serde(flatten)]
50    pub common: NatsCommon,
51    // accept "append-only"
52    pub r#type: String,
53}
54
55#[derive(Clone, Debug)]
56pub struct NatsSink {
57    pub config: NatsConfig,
58    schema: Schema,
59    is_append_only: bool,
60}
61
62// sink write
63pub struct NatsSinkWriter {
64    pub config: NatsConfig,
65    context: Context,
66    #[expect(dead_code)]
67    schema: Schema,
68    json_encoder: JsonEncoder,
69}
70
71pub type NatsSinkDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
72
73/// Basic data types for use with the nats interface
74impl NatsConfig {
75    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
76        let config = serde_json::from_value::<NatsConfig>(serde_json::to_value(values).unwrap())
77            .map_err(|e| SinkError::Config(anyhow!(e)))?;
78        if config.r#type != SINK_TYPE_APPEND_ONLY {
79            Err(SinkError::Config(anyhow!(
80                "NATS sink only supports append-only mode"
81            )))
82        } else {
83            Ok(config)
84        }
85    }
86}
87
88impl TryFrom<SinkParam> for NatsSink {
89    type Error = SinkError;
90
91    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
92        let schema = param.schema();
93        let config = NatsConfig::from_btreemap(param.properties)?;
94        Ok(Self {
95            config,
96            schema,
97            is_append_only: param.sink_type.is_append_only(),
98        })
99    }
100}
101
102impl Sink for NatsSink {
103    type Coordinator = DummySinkCommitCoordinator;
104    type LogSinker = AsyncTruncateLogSinkerOf<NatsSinkWriter>;
105
106    const SINK_NAME: &'static str = NATS_SINK;
107
108    async fn validate(&self) -> Result<()> {
109        if !self.is_append_only {
110            return Err(SinkError::Nats(anyhow!(
111                "NATS sink only supports append-only mode"
112            )));
113        }
114        let _client = (self.config.common.build_client().await)
115            .context("validate nats sink error")
116            .map_err(SinkError::Nats)?;
117        Ok(())
118    }
119
120    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
121        Ok(
122            NatsSinkWriter::new(self.config.clone(), self.schema.clone())
123                .await?
124                .into_log_sinker(NATS_SEND_FUTURE_BUFFER_MAX_SIZE),
125        )
126    }
127}
128
129impl NatsSinkWriter {
130    pub async fn new(config: NatsConfig, schema: Schema) -> Result<Self> {
131        let context = config
132            .common
133            .build_context()
134            .await
135            .map_err(|e| SinkError::Nats(anyhow!(e)))?;
136        Ok::<_, SinkError>(Self {
137            config: config.clone(),
138            context,
139            schema: schema.clone(),
140            json_encoder: JsonEncoder::new(
141                schema,
142                None,
143                DateHandlingMode::FromCe,
144                TimestampHandlingMode::Milli,
145                TimestamptzHandlingMode::UtcWithoutSuffix,
146                TimeHandlingMode::Milli,
147                JsonbHandlingMode::String,
148            ),
149        })
150    }
151}
152
153impl AsyncTruncateSinkWriter for NatsSinkWriter {
154    type DeliveryFuture = NatsSinkDeliveryFuture;
155
156    async fn write_chunk<'a>(
157        &'a mut self,
158        chunk: StreamChunk,
159        mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
160    ) -> Result<()> {
161        let mut data = chunk_to_json(chunk, &self.json_encoder)?;
162        for item in &mut data {
163            let publish_ack_future = Retry::spawn(
164                ExponentialBackoff::from_millis(100).map(jitter).take(3),
165                || async {
166                    self.context
167                        .publish(self.config.common.subject.clone(), item.clone().into())
168                        .await
169                        .context("nats sink error")
170                        .map_err(SinkError::Nats)
171                },
172            )
173            .await
174            .context("nats sink error")
175            .map_err(SinkError::Nats)?;
176            let future = publish_ack_future.into_future().map(|result| {
177                result
178                    .context("Nats sink error")
179                    .map_err(SinkError::Nats)
180                    .map(|_| ())
181            });
182            add_future.add_future_may_await(future).await?;
183        }
184        Ok(())
185    }
186}