risingwave_connector/sink/
nats.rs1use core::fmt::Debug;
16use core::future::IntoFuture;
17use std::collections::BTreeMap;
18use std::sync::Arc;
19
20use anyhow::{Context as _, anyhow};
21use async_nats::jetstream::context::Context;
22use futures::FutureExt;
23use futures::prelude::TryFuture;
24use risingwave_common::array::StreamChunk;
25use risingwave_common::catalog::Schema;
26use serde::Deserialize;
27use serde_with::serde_as;
28use tokio_retry::Retry;
29use tokio_retry::strategy::{ExponentialBackoff, jitter};
30use with_options::WithOptions;
31
32use super::SinkWriterParam;
33use super::encoder::{
34 DateHandlingMode, JsonbHandlingMode, TimeHandlingMode, TimestamptzHandlingMode,
35};
36use super::utils::chunk_to_json;
37use crate::connector_common::NatsCommon;
38use crate::enforce_secret::EnforceSecret;
39use crate::sink::encoder::{JsonEncoder, TimestampHandlingMode};
40use crate::sink::log_store::DeliveryFutureManagerAddFuture;
41use crate::sink::writer::{
42 AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt,
43};
44use crate::sink::{Result, SINK_TYPE_APPEND_ONLY, Sink, SinkError, SinkParam};
45
46pub const NATS_SINK: &str = "nats";
47const NATS_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
48
49#[serde_as]
50#[derive(Clone, Debug, Deserialize, WithOptions)]
51pub struct NatsConfig {
52 #[serde(flatten)]
53 pub common: NatsCommon,
54 pub r#type: String,
56}
57
58#[derive(Clone, Debug)]
59pub struct NatsSink {
60 pub config: NatsConfig,
61 schema: Schema,
62 is_append_only: bool,
63}
64
65impl EnforceSecret for NatsSink {
66 fn enforce_secret<'a>(
67 prop_iter: impl Iterator<Item = &'a str>,
68 ) -> crate::error::ConnectorResult<()> {
69 for prop in prop_iter {
70 NatsCommon::enforce_one(prop)?;
71 }
72 Ok(())
73 }
74}
75
76pub struct NatsSinkWriter {
78 pub config: NatsConfig,
79 context: Context,
80 #[expect(dead_code)]
83 client: Arc<async_nats::Client>,
84 #[expect(dead_code)]
85 schema: Schema,
86 json_encoder: JsonEncoder,
87}
88
89pub type NatsSinkDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
90
91impl NatsConfig {
93 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
94 let config = serde_json::from_value::<NatsConfig>(serde_json::to_value(values).unwrap())
95 .map_err(|e| SinkError::Config(anyhow!(e)))?;
96 if config.r#type != SINK_TYPE_APPEND_ONLY {
97 Err(SinkError::Config(anyhow!(
98 "NATS sink only supports append-only mode"
99 )))
100 } else {
101 Ok(config)
102 }
103 }
104}
105
106impl TryFrom<SinkParam> for NatsSink {
107 type Error = SinkError;
108
109 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
110 let schema = param.schema();
111 let config = NatsConfig::from_btreemap(param.properties)?;
112 Ok(Self {
113 config,
114 schema,
115 is_append_only: param.sink_type.is_append_only(),
116 })
117 }
118}
119
120impl Sink for NatsSink {
121 type LogSinker = AsyncTruncateLogSinkerOf<NatsSinkWriter>;
122
123 const SINK_NAME: &'static str = NATS_SINK;
124
125 async fn validate(&self) -> Result<()> {
126 if !self.is_append_only {
127 return Err(SinkError::Nats(anyhow!(
128 "NATS sink only supports append-only mode"
129 )));
130 }
131 let _client = (self.config.common.build_client().await)
132 .context("validate nats sink error")
133 .map_err(SinkError::Nats)?;
134 Ok(())
135 }
136
137 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
138 Ok(
139 NatsSinkWriter::new(self.config.clone(), self.schema.clone())
140 .await?
141 .into_log_sinker(NATS_SEND_FUTURE_BUFFER_MAX_SIZE),
142 )
143 }
144}
145
146impl NatsSinkWriter {
147 pub async fn new(config: NatsConfig, schema: Schema) -> Result<Self> {
148 let client = config
149 .common
150 .build_client()
151 .await
152 .map_err(|e| SinkError::Nats(anyhow!(e)))?;
153 let context = NatsCommon::build_context_from_client(&client);
154 Ok::<_, SinkError>(Self {
155 config: config.clone(),
156 context,
157 client,
158 schema: schema.clone(),
159 json_encoder: JsonEncoder::new(
160 schema,
161 None,
162 DateHandlingMode::FromCe,
163 TimestampHandlingMode::Milli,
164 TimestamptzHandlingMode::UtcWithoutSuffix,
165 TimeHandlingMode::Milli,
166 JsonbHandlingMode::String,
167 ),
168 })
169 }
170}
171
172impl AsyncTruncateSinkWriter for NatsSinkWriter {
173 type DeliveryFuture = NatsSinkDeliveryFuture;
174
175 #[define_opaque(NatsSinkDeliveryFuture)]
176 async fn write_chunk<'a>(
177 &'a mut self,
178 chunk: StreamChunk,
179 mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
180 ) -> Result<()> {
181 let mut data = chunk_to_json(chunk, &self.json_encoder)?;
182 for item in &mut data {
183 let publish_ack_future = Retry::spawn(
184 ExponentialBackoff::from_millis(100).map(jitter).take(3),
185 || async {
186 self.context
187 .publish(self.config.common.subject.clone(), item.clone().into())
188 .await
189 .context("nats sink error")
190 .map_err(SinkError::Nats)
191 },
192 )
193 .await
194 .context("nats sink error")
195 .map_err(SinkError::Nats)?;
196 let future = publish_ack_future.into_future().map(|result| {
197 result
198 .context("Nats sink error")
199 .map_err(SinkError::Nats)
200 .map(|_| ())
201 });
202 add_future.add_future_may_await(future).await?;
203 }
204 Ok(())
205 }
206}