risingwave_connector/sink/
nats.rs1use core::fmt::Debug;
15use core::future::IntoFuture;
16use std::collections::BTreeMap;
17
18use anyhow::{Context as _, anyhow};
19use async_nats::jetstream::context::Context;
20use futures::FutureExt;
21use futures::prelude::TryFuture;
22use risingwave_common::array::StreamChunk;
23use risingwave_common::catalog::Schema;
24use serde_derive::Deserialize;
25use serde_with::serde_as;
26use tokio_retry::Retry;
27use tokio_retry::strategy::{ExponentialBackoff, jitter};
28use with_options::WithOptions;
29
30use super::encoder::{
31 DateHandlingMode, JsonbHandlingMode, TimeHandlingMode, TimestamptzHandlingMode,
32};
33use super::utils::chunk_to_json;
34use super::{DummySinkCommitCoordinator, SinkWriterParam};
35use crate::connector_common::NatsCommon;
36use crate::enforce_secret::EnforceSecret;
37use crate::sink::encoder::{JsonEncoder, TimestampHandlingMode};
38use crate::sink::log_store::DeliveryFutureManagerAddFuture;
39use crate::sink::writer::{
40 AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt,
41};
42use crate::sink::{Result, SINK_TYPE_APPEND_ONLY, Sink, SinkError, SinkParam};
43
44pub const NATS_SINK: &str = "nats";
45const NATS_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
46
47#[serde_as]
48#[derive(Clone, Debug, Deserialize, WithOptions)]
49pub struct NatsConfig {
50 #[serde(flatten)]
51 pub common: NatsCommon,
52 pub r#type: String,
54}
55
56#[derive(Clone, Debug)]
57pub struct NatsSink {
58 pub config: NatsConfig,
59 schema: Schema,
60 is_append_only: bool,
61}
62
63impl EnforceSecret for NatsSink {
64 fn enforce_secret<'a>(
65 prop_iter: impl Iterator<Item = &'a str>,
66 ) -> crate::error::ConnectorResult<()> {
67 for prop in prop_iter {
68 NatsCommon::enforce_one(prop)?;
69 }
70 Ok(())
71 }
72}
73
74pub struct NatsSinkWriter {
76 pub config: NatsConfig,
77 context: Context,
78 #[expect(dead_code)]
79 schema: Schema,
80 json_encoder: JsonEncoder,
81}
82
83pub type NatsSinkDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
84
85impl NatsConfig {
87 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
88 let config = serde_json::from_value::<NatsConfig>(serde_json::to_value(values).unwrap())
89 .map_err(|e| SinkError::Config(anyhow!(e)))?;
90 if config.r#type != SINK_TYPE_APPEND_ONLY {
91 Err(SinkError::Config(anyhow!(
92 "NATS sink only supports append-only mode"
93 )))
94 } else {
95 Ok(config)
96 }
97 }
98}
99
100impl TryFrom<SinkParam> for NatsSink {
101 type Error = SinkError;
102
103 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
104 let schema = param.schema();
105 let config = NatsConfig::from_btreemap(param.properties)?;
106 Ok(Self {
107 config,
108 schema,
109 is_append_only: param.sink_type.is_append_only(),
110 })
111 }
112}
113
114impl Sink for NatsSink {
115 type Coordinator = DummySinkCommitCoordinator;
116 type LogSinker = AsyncTruncateLogSinkerOf<NatsSinkWriter>;
117
118 const SINK_NAME: &'static str = NATS_SINK;
119
120 async fn validate(&self) -> Result<()> {
121 if !self.is_append_only {
122 return Err(SinkError::Nats(anyhow!(
123 "NATS sink only supports append-only mode"
124 )));
125 }
126 let _client = (self.config.common.build_client().await)
127 .context("validate nats sink error")
128 .map_err(SinkError::Nats)?;
129 Ok(())
130 }
131
132 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
133 Ok(
134 NatsSinkWriter::new(self.config.clone(), self.schema.clone())
135 .await?
136 .into_log_sinker(NATS_SEND_FUTURE_BUFFER_MAX_SIZE),
137 )
138 }
139}
140
141impl NatsSinkWriter {
142 pub async fn new(config: NatsConfig, schema: Schema) -> Result<Self> {
143 let context = config
144 .common
145 .build_context()
146 .await
147 .map_err(|e| SinkError::Nats(anyhow!(e)))?;
148 Ok::<_, SinkError>(Self {
149 config: config.clone(),
150 context,
151 schema: schema.clone(),
152 json_encoder: JsonEncoder::new(
153 schema,
154 None,
155 DateHandlingMode::FromCe,
156 TimestampHandlingMode::Milli,
157 TimestamptzHandlingMode::UtcWithoutSuffix,
158 TimeHandlingMode::Milli,
159 JsonbHandlingMode::String,
160 ),
161 })
162 }
163}
164
165impl AsyncTruncateSinkWriter for NatsSinkWriter {
166 type DeliveryFuture = NatsSinkDeliveryFuture;
167
168 async fn write_chunk<'a>(
169 &'a mut self,
170 chunk: StreamChunk,
171 mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
172 ) -> Result<()> {
173 let mut data = chunk_to_json(chunk, &self.json_encoder)?;
174 for item in &mut data {
175 let publish_ack_future = Retry::spawn(
176 ExponentialBackoff::from_millis(100).map(jitter).take(3),
177 || async {
178 self.context
179 .publish(self.config.common.subject.clone(), item.clone().into())
180 .await
181 .context("nats sink error")
182 .map_err(SinkError::Nats)
183 },
184 )
185 .await
186 .context("nats sink error")
187 .map_err(SinkError::Nats)?;
188 let future = publish_ack_future.into_future().map(|result| {
189 result
190 .context("Nats sink error")
191 .map_err(SinkError::Nats)
192 .map(|_| ())
193 });
194 add_future.add_future_may_await(future).await?;
195 }
196 Ok(())
197 }
198}