risingwave_connector/sink/
nats.rs1use core::fmt::Debug;
15use core::future::IntoFuture;
16use std::collections::BTreeMap;
17
18use anyhow::{Context as _, anyhow};
19use async_nats::jetstream::context::Context;
20use futures::FutureExt;
21use futures::prelude::TryFuture;
22use risingwave_common::array::StreamChunk;
23use risingwave_common::catalog::Schema;
24use serde_derive::Deserialize;
25use serde_with::serde_as;
26use tokio_retry::Retry;
27use tokio_retry::strategy::{ExponentialBackoff, jitter};
28use with_options::WithOptions;
29
30use super::encoder::{
31 DateHandlingMode, JsonbHandlingMode, TimeHandlingMode, TimestamptzHandlingMode,
32};
33use super::utils::chunk_to_json;
34use super::{DummySinkCommitCoordinator, SinkWriterParam};
35use crate::connector_common::NatsCommon;
36use crate::sink::encoder::{JsonEncoder, TimestampHandlingMode};
37use crate::sink::log_store::DeliveryFutureManagerAddFuture;
38use crate::sink::writer::{
39 AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt,
40};
41use crate::sink::{Result, SINK_TYPE_APPEND_ONLY, Sink, SinkError, SinkParam};
42
43pub const NATS_SINK: &str = "nats";
44const NATS_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
45
46#[serde_as]
47#[derive(Clone, Debug, Deserialize, WithOptions)]
48pub struct NatsConfig {
49 #[serde(flatten)]
50 pub common: NatsCommon,
51 pub r#type: String,
53}
54
55#[derive(Clone, Debug)]
56pub struct NatsSink {
57 pub config: NatsConfig,
58 schema: Schema,
59 is_append_only: bool,
60}
61
62pub struct NatsSinkWriter {
64 pub config: NatsConfig,
65 context: Context,
66 #[expect(dead_code)]
67 schema: Schema,
68 json_encoder: JsonEncoder,
69}
70
71pub type NatsSinkDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
72
73impl NatsConfig {
75 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
76 let config = serde_json::from_value::<NatsConfig>(serde_json::to_value(values).unwrap())
77 .map_err(|e| SinkError::Config(anyhow!(e)))?;
78 if config.r#type != SINK_TYPE_APPEND_ONLY {
79 Err(SinkError::Config(anyhow!(
80 "NATS sink only supports append-only mode"
81 )))
82 } else {
83 Ok(config)
84 }
85 }
86}
87
88impl TryFrom<SinkParam> for NatsSink {
89 type Error = SinkError;
90
91 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
92 let schema = param.schema();
93 let config = NatsConfig::from_btreemap(param.properties)?;
94 Ok(Self {
95 config,
96 schema,
97 is_append_only: param.sink_type.is_append_only(),
98 })
99 }
100}
101
102impl Sink for NatsSink {
103 type Coordinator = DummySinkCommitCoordinator;
104 type LogSinker = AsyncTruncateLogSinkerOf<NatsSinkWriter>;
105
106 const SINK_NAME: &'static str = NATS_SINK;
107
108 async fn validate(&self) -> Result<()> {
109 if !self.is_append_only {
110 return Err(SinkError::Nats(anyhow!(
111 "NATS sink only supports append-only mode"
112 )));
113 }
114 let _client = (self.config.common.build_client().await)
115 .context("validate nats sink error")
116 .map_err(SinkError::Nats)?;
117 Ok(())
118 }
119
120 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
121 Ok(
122 NatsSinkWriter::new(self.config.clone(), self.schema.clone())
123 .await?
124 .into_log_sinker(NATS_SEND_FUTURE_BUFFER_MAX_SIZE),
125 )
126 }
127}
128
129impl NatsSinkWriter {
130 pub async fn new(config: NatsConfig, schema: Schema) -> Result<Self> {
131 let context = config
132 .common
133 .build_context()
134 .await
135 .map_err(|e| SinkError::Nats(anyhow!(e)))?;
136 Ok::<_, SinkError>(Self {
137 config: config.clone(),
138 context,
139 schema: schema.clone(),
140 json_encoder: JsonEncoder::new(
141 schema,
142 None,
143 DateHandlingMode::FromCe,
144 TimestampHandlingMode::Milli,
145 TimestamptzHandlingMode::UtcWithoutSuffix,
146 TimeHandlingMode::Milli,
147 JsonbHandlingMode::String,
148 ),
149 })
150 }
151}
152
153impl AsyncTruncateSinkWriter for NatsSinkWriter {
154 type DeliveryFuture = NatsSinkDeliveryFuture;
155
156 async fn write_chunk<'a>(
157 &'a mut self,
158 chunk: StreamChunk,
159 mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
160 ) -> Result<()> {
161 let mut data = chunk_to_json(chunk, &self.json_encoder)?;
162 for item in &mut data {
163 let publish_ack_future = Retry::spawn(
164 ExponentialBackoff::from_millis(100).map(jitter).take(3),
165 || async {
166 self.context
167 .publish(self.config.common.subject.clone(), item.clone().into())
168 .await
169 .context("nats sink error")
170 .map_err(SinkError::Nats)
171 },
172 )
173 .await
174 .context("nats sink error")
175 .map_err(SinkError::Nats)?;
176 let future = publish_ack_future.into_future().map(|result| {
177 result
178 .context("Nats sink error")
179 .map_err(SinkError::Nats)
180 .map(|_| ())
181 });
182 add_future.add_future_may_await(future).await?;
183 }
184 Ok(())
185 }
186}