risingwave_connector/sink/
nats.rs1use core::fmt::Debug;
15use core::future::IntoFuture;
16use std::collections::BTreeMap;
17
18use anyhow::{Context as _, anyhow};
19use async_nats::jetstream::context::Context;
20use futures::FutureExt;
21use futures::prelude::TryFuture;
22use risingwave_common::array::StreamChunk;
23use risingwave_common::catalog::Schema;
24use serde_derive::Deserialize;
25use serde_with::serde_as;
26use tokio_retry::Retry;
27use tokio_retry::strategy::{ExponentialBackoff, jitter};
28use with_options::WithOptions;
29
30use super::SinkWriterParam;
31use super::encoder::{
32 DateHandlingMode, JsonbHandlingMode, TimeHandlingMode, TimestamptzHandlingMode,
33};
34use super::utils::chunk_to_json;
35use crate::connector_common::NatsCommon;
36use crate::enforce_secret::EnforceSecret;
37use crate::sink::encoder::{JsonEncoder, TimestampHandlingMode};
38use crate::sink::log_store::DeliveryFutureManagerAddFuture;
39use crate::sink::writer::{
40 AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt,
41};
42use crate::sink::{Result, SINK_TYPE_APPEND_ONLY, Sink, SinkError, SinkParam};
43
44pub const NATS_SINK: &str = "nats";
45const NATS_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
46
47#[serde_as]
48#[derive(Clone, Debug, Deserialize, WithOptions)]
49pub struct NatsConfig {
50 #[serde(flatten)]
51 pub common: NatsCommon,
52 pub r#type: String,
54}
55
56#[derive(Clone, Debug)]
57pub struct NatsSink {
58 pub config: NatsConfig,
59 schema: Schema,
60 is_append_only: bool,
61}
62
63impl EnforceSecret for NatsSink {
64 fn enforce_secret<'a>(
65 prop_iter: impl Iterator<Item = &'a str>,
66 ) -> crate::error::ConnectorResult<()> {
67 for prop in prop_iter {
68 NatsCommon::enforce_one(prop)?;
69 }
70 Ok(())
71 }
72}
73
74pub struct NatsSinkWriter {
76 pub config: NatsConfig,
77 context: Context,
78 #[expect(dead_code)]
79 schema: Schema,
80 json_encoder: JsonEncoder,
81}
82
83pub type NatsSinkDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
84
85impl NatsConfig {
87 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
88 let config = serde_json::from_value::<NatsConfig>(serde_json::to_value(values).unwrap())
89 .map_err(|e| SinkError::Config(anyhow!(e)))?;
90 if config.r#type != SINK_TYPE_APPEND_ONLY {
91 Err(SinkError::Config(anyhow!(
92 "NATS sink only supports append-only mode"
93 )))
94 } else {
95 Ok(config)
96 }
97 }
98}
99
100impl TryFrom<SinkParam> for NatsSink {
101 type Error = SinkError;
102
103 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
104 let schema = param.schema();
105 let config = NatsConfig::from_btreemap(param.properties)?;
106 Ok(Self {
107 config,
108 schema,
109 is_append_only: param.sink_type.is_append_only(),
110 })
111 }
112}
113
114impl Sink for NatsSink {
115 type LogSinker = AsyncTruncateLogSinkerOf<NatsSinkWriter>;
116
117 const SINK_NAME: &'static str = NATS_SINK;
118
119 async fn validate(&self) -> Result<()> {
120 if !self.is_append_only {
121 return Err(SinkError::Nats(anyhow!(
122 "NATS sink only supports append-only mode"
123 )));
124 }
125 let _client = (self.config.common.build_client().await)
126 .context("validate nats sink error")
127 .map_err(SinkError::Nats)?;
128 Ok(())
129 }
130
131 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
132 Ok(
133 NatsSinkWriter::new(self.config.clone(), self.schema.clone())
134 .await?
135 .into_log_sinker(NATS_SEND_FUTURE_BUFFER_MAX_SIZE),
136 )
137 }
138}
139
140impl NatsSinkWriter {
141 pub async fn new(config: NatsConfig, schema: Schema) -> Result<Self> {
142 let context = config
143 .common
144 .build_context()
145 .await
146 .map_err(|e| SinkError::Nats(anyhow!(e)))?;
147 Ok::<_, SinkError>(Self {
148 config: config.clone(),
149 context,
150 schema: schema.clone(),
151 json_encoder: JsonEncoder::new(
152 schema,
153 None,
154 DateHandlingMode::FromCe,
155 TimestampHandlingMode::Milli,
156 TimestamptzHandlingMode::UtcWithoutSuffix,
157 TimeHandlingMode::Milli,
158 JsonbHandlingMode::String,
159 ),
160 })
161 }
162}
163
164impl AsyncTruncateSinkWriter for NatsSinkWriter {
165 type DeliveryFuture = NatsSinkDeliveryFuture;
166
167 #[define_opaque(NatsSinkDeliveryFuture)]
168 async fn write_chunk<'a>(
169 &'a mut self,
170 chunk: StreamChunk,
171 mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
172 ) -> Result<()> {
173 let mut data = chunk_to_json(chunk, &self.json_encoder)?;
174 for item in &mut data {
175 let publish_ack_future = Retry::spawn(
176 ExponentialBackoff::from_millis(100).map(jitter).take(3),
177 || async {
178 self.context
179 .publish(self.config.common.subject.clone(), item.clone().into())
180 .await
181 .context("nats sink error")
182 .map_err(SinkError::Nats)
183 },
184 )
185 .await
186 .context("nats sink error")
187 .map_err(SinkError::Nats)?;
188 let future = publish_ack_future.into_future().map(|result| {
189 result
190 .context("Nats sink error")
191 .map_err(SinkError::Nats)
192 .map(|_| ())
193 });
194 add_future.add_future_may_await(future).await?;
195 }
196 Ok(())
197 }
198}