risingwave_connector/sink/
nats.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::fmt::Debug;
16use core::future::IntoFuture;
17use std::collections::BTreeMap;
18use std::sync::Arc;
19
20use anyhow::{Context as _, anyhow};
21use async_nats::jetstream::context::Context;
22use futures::FutureExt;
23use futures::prelude::TryFuture;
24use risingwave_common::array::StreamChunk;
25use risingwave_common::catalog::Schema;
26use serde::Deserialize;
27use serde_with::serde_as;
28use tokio_retry::Retry;
29use tokio_retry::strategy::{ExponentialBackoff, jitter};
30use with_options::WithOptions;
31
32use super::SinkWriterParam;
33use super::encoder::{
34    DateHandlingMode, JsonbHandlingMode, TimeHandlingMode, TimestamptzHandlingMode,
35};
36use super::utils::chunk_to_json;
37use crate::connector_common::NatsCommon;
38use crate::enforce_secret::EnforceSecret;
39use crate::sink::encoder::{JsonEncoder, TimestampHandlingMode};
40use crate::sink::log_store::DeliveryFutureManagerAddFuture;
41use crate::sink::writer::{
42    AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt,
43};
44use crate::sink::{Result, SINK_TYPE_APPEND_ONLY, Sink, SinkError, SinkParam};
45
46pub const NATS_SINK: &str = "nats";
47const NATS_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
48
49#[serde_as]
50#[derive(Clone, Debug, Deserialize, WithOptions)]
51pub struct NatsConfig {
52    #[serde(flatten)]
53    pub common: NatsCommon,
54    // accept "append-only"
55    pub r#type: String,
56}
57
58#[derive(Clone, Debug)]
59pub struct NatsSink {
60    pub config: NatsConfig,
61    schema: Schema,
62    is_append_only: bool,
63}
64
65impl EnforceSecret for NatsSink {
66    fn enforce_secret<'a>(
67        prop_iter: impl Iterator<Item = &'a str>,
68    ) -> crate::error::ConnectorResult<()> {
69        for prop in prop_iter {
70            NatsCommon::enforce_one(prop)?;
71        }
72        Ok(())
73    }
74}
75
76// sink write
77pub struct NatsSinkWriter {
78    pub config: NatsConfig,
79    context: Context,
80    /// Hold the client Arc to keep it alive. This allows the shared client cache to reuse
81    /// the connection while we're still using it.
82    #[expect(dead_code)]
83    client: Arc<async_nats::Client>,
84    #[expect(dead_code)]
85    schema: Schema,
86    json_encoder: JsonEncoder,
87}
88
89pub type NatsSinkDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
90
91/// Basic data types for use with the nats interface
92impl NatsConfig {
93    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
94        let config = serde_json::from_value::<NatsConfig>(serde_json::to_value(values).unwrap())
95            .map_err(|e| SinkError::Config(anyhow!(e)))?;
96        if config.r#type != SINK_TYPE_APPEND_ONLY {
97            Err(SinkError::Config(anyhow!(
98                "NATS sink only supports append-only mode"
99            )))
100        } else {
101            Ok(config)
102        }
103    }
104}
105
106impl TryFrom<SinkParam> for NatsSink {
107    type Error = SinkError;
108
109    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
110        let schema = param.schema();
111        let config = NatsConfig::from_btreemap(param.properties)?;
112        Ok(Self {
113            config,
114            schema,
115            is_append_only: param.sink_type.is_append_only(),
116        })
117    }
118}
119
120impl Sink for NatsSink {
121    type LogSinker = AsyncTruncateLogSinkerOf<NatsSinkWriter>;
122
123    const SINK_NAME: &'static str = NATS_SINK;
124
125    async fn validate(&self) -> Result<()> {
126        if !self.is_append_only {
127            return Err(SinkError::Nats(anyhow!(
128                "NATS sink only supports append-only mode"
129            )));
130        }
131        let _client = (self.config.common.build_client().await)
132            .context("validate nats sink error")
133            .map_err(SinkError::Nats)?;
134        Ok(())
135    }
136
137    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
138        Ok(
139            NatsSinkWriter::new(self.config.clone(), self.schema.clone())
140                .await?
141                .into_log_sinker(NATS_SEND_FUTURE_BUFFER_MAX_SIZE),
142        )
143    }
144}
145
146impl NatsSinkWriter {
147    pub async fn new(config: NatsConfig, schema: Schema) -> Result<Self> {
148        let client = config
149            .common
150            .build_client()
151            .await
152            .map_err(|e| SinkError::Nats(anyhow!(e)))?;
153        let context = NatsCommon::build_context_from_client(&client);
154        Ok::<_, SinkError>(Self {
155            config: config.clone(),
156            context,
157            client,
158            schema: schema.clone(),
159            json_encoder: JsonEncoder::new(
160                schema,
161                None,
162                DateHandlingMode::FromCe,
163                TimestampHandlingMode::Milli,
164                TimestamptzHandlingMode::UtcWithoutSuffix,
165                TimeHandlingMode::Milli,
166                JsonbHandlingMode::String,
167            ),
168        })
169    }
170}
171
172impl AsyncTruncateSinkWriter for NatsSinkWriter {
173    type DeliveryFuture = NatsSinkDeliveryFuture;
174
175    #[define_opaque(NatsSinkDeliveryFuture)]
176    async fn write_chunk<'a>(
177        &'a mut self,
178        chunk: StreamChunk,
179        mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
180    ) -> Result<()> {
181        let mut data = chunk_to_json(chunk, &self.json_encoder)?;
182        for item in &mut data {
183            let publish_ack_future = Retry::spawn(
184                ExponentialBackoff::from_millis(100).map(jitter).take(3),
185                || async {
186                    self.context
187                        .publish(self.config.common.subject.clone(), item.clone().into())
188                        .await
189                        .context("nats sink error")
190                        .map_err(SinkError::Nats)
191                },
192            )
193            .await
194            .context("nats sink error")
195            .map_err(SinkError::Nats)?;
196            let future = publish_ack_future.into_future().map(|result| {
197                result
198                    .context("Nats sink error")
199                    .map_err(SinkError::Nats)
200                    .map(|_| ())
201            });
202            add_future.add_future_may_await(future).await?;
203        }
204        Ok(())
205    }
206}