risingwave_connector/sink/
nats.rs1use core::fmt::Debug;
15use core::future::IntoFuture;
16use std::collections::BTreeMap;
17use std::sync::Arc;
18
19use anyhow::{Context as _, anyhow};
20use async_nats::jetstream::context::Context;
21use futures::FutureExt;
22use futures::prelude::TryFuture;
23use risingwave_common::array::StreamChunk;
24use risingwave_common::catalog::Schema;
25use serde::Deserialize;
26use serde_with::serde_as;
27use tokio_retry::Retry;
28use tokio_retry::strategy::{ExponentialBackoff, jitter};
29use with_options::WithOptions;
30
31use super::SinkWriterParam;
32use super::encoder::{
33 DateHandlingMode, JsonbHandlingMode, TimeHandlingMode, TimestamptzHandlingMode,
34};
35use super::utils::chunk_to_json;
36use crate::connector_common::NatsCommon;
37use crate::enforce_secret::EnforceSecret;
38use crate::sink::encoder::{JsonEncoder, TimestampHandlingMode};
39use crate::sink::log_store::DeliveryFutureManagerAddFuture;
40use crate::sink::writer::{
41 AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt,
42};
43use crate::sink::{Result, SINK_TYPE_APPEND_ONLY, Sink, SinkError, SinkParam};
44
45pub const NATS_SINK: &str = "nats";
46const NATS_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
47
48#[serde_as]
49#[derive(Clone, Debug, Deserialize, WithOptions)]
50pub struct NatsConfig {
51 #[serde(flatten)]
52 pub common: NatsCommon,
53 pub r#type: String,
55}
56
57#[derive(Clone, Debug)]
58pub struct NatsSink {
59 pub config: NatsConfig,
60 schema: Schema,
61 is_append_only: bool,
62}
63
64impl EnforceSecret for NatsSink {
65 fn enforce_secret<'a>(
66 prop_iter: impl Iterator<Item = &'a str>,
67 ) -> crate::error::ConnectorResult<()> {
68 for prop in prop_iter {
69 NatsCommon::enforce_one(prop)?;
70 }
71 Ok(())
72 }
73}
74
75pub struct NatsSinkWriter {
77 pub config: NatsConfig,
78 context: Context,
79 #[expect(dead_code)]
82 client: Arc<async_nats::Client>,
83 #[expect(dead_code)]
84 schema: Schema,
85 json_encoder: JsonEncoder,
86}
87
88pub type NatsSinkDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
89
90impl NatsConfig {
92 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
93 let config = serde_json::from_value::<NatsConfig>(serde_json::to_value(values).unwrap())
94 .map_err(|e| SinkError::Config(anyhow!(e)))?;
95 if config.r#type != SINK_TYPE_APPEND_ONLY {
96 Err(SinkError::Config(anyhow!(
97 "NATS sink only supports append-only mode"
98 )))
99 } else {
100 Ok(config)
101 }
102 }
103}
104
105impl TryFrom<SinkParam> for NatsSink {
106 type Error = SinkError;
107
108 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
109 let schema = param.schema();
110 let config = NatsConfig::from_btreemap(param.properties)?;
111 Ok(Self {
112 config,
113 schema,
114 is_append_only: param.sink_type.is_append_only(),
115 })
116 }
117}
118
119impl Sink for NatsSink {
120 type LogSinker = AsyncTruncateLogSinkerOf<NatsSinkWriter>;
121
122 const SINK_NAME: &'static str = NATS_SINK;
123
124 async fn validate(&self) -> Result<()> {
125 if !self.is_append_only {
126 return Err(SinkError::Nats(anyhow!(
127 "NATS sink only supports append-only mode"
128 )));
129 }
130 let _client = (self.config.common.build_client().await)
131 .context("validate nats sink error")
132 .map_err(SinkError::Nats)?;
133 Ok(())
134 }
135
136 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
137 Ok(
138 NatsSinkWriter::new(self.config.clone(), self.schema.clone())
139 .await?
140 .into_log_sinker(NATS_SEND_FUTURE_BUFFER_MAX_SIZE),
141 )
142 }
143}
144
145impl NatsSinkWriter {
146 pub async fn new(config: NatsConfig, schema: Schema) -> Result<Self> {
147 let client = config
148 .common
149 .build_client()
150 .await
151 .map_err(|e| SinkError::Nats(anyhow!(e)))?;
152 let context = NatsCommon::build_context_from_client(&client);
153 Ok::<_, SinkError>(Self {
154 config: config.clone(),
155 context,
156 client,
157 schema: schema.clone(),
158 json_encoder: JsonEncoder::new(
159 schema,
160 None,
161 DateHandlingMode::FromCe,
162 TimestampHandlingMode::Milli,
163 TimestamptzHandlingMode::UtcWithoutSuffix,
164 TimeHandlingMode::Milli,
165 JsonbHandlingMode::String,
166 ),
167 })
168 }
169}
170
171impl AsyncTruncateSinkWriter for NatsSinkWriter {
172 type DeliveryFuture = NatsSinkDeliveryFuture;
173
174 #[define_opaque(NatsSinkDeliveryFuture)]
175 async fn write_chunk<'a>(
176 &'a mut self,
177 chunk: StreamChunk,
178 mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
179 ) -> Result<()> {
180 let mut data = chunk_to_json(chunk, &self.json_encoder)?;
181 for item in &mut data {
182 let publish_ack_future = Retry::spawn(
183 ExponentialBackoff::from_millis(100).map(jitter).take(3),
184 || async {
185 self.context
186 .publish(self.config.common.subject.clone(), item.clone().into())
187 .await
188 .context("nats sink error")
189 .map_err(SinkError::Nats)
190 },
191 )
192 .await
193 .context("nats sink error")
194 .map_err(SinkError::Nats)?;
195 let future = publish_ack_future.into_future().map(|result| {
196 result
197 .context("Nats sink error")
198 .map_err(SinkError::Nats)
199 .map(|_| ())
200 });
201 add_future.add_future_may_await(future).await?;
202 }
203 Ok(())
204 }
205}