risingwave_connector/sink/
http.rs1use std::collections::BTreeMap;
16
17use anyhow::{Context, anyhow};
18use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue};
19use risingwave_common::array::{Op, StreamChunk};
20use risingwave_common::catalog::Schema;
21use risingwave_common::row::Row;
22use risingwave_common::types::{DataType, ScalarRefImpl};
23use serde::Deserialize;
24use with_options::WithOptions;
25
26use crate::enforce_secret::EnforceSecret;
27use crate::sink::log_store::DeliveryFutureManagerAddFuture;
28use crate::sink::writer::{
29 AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt,
30};
31use crate::sink::{Result, SINK_TYPE_APPEND_ONLY, Sink, SinkError, SinkParam, SinkWriterParam};
32
33pub const HTTP_SINK: &str = "http";
34
35#[derive(Clone, Debug, Deserialize, WithOptions)]
36pub struct HttpConfig {
37 pub url: String,
39
40 pub content_type: Option<String>,
43
44 pub r#type: String,
46}
47
48impl EnforceSecret for HttpConfig {}
49
50impl HttpConfig {
51 pub fn from_btreemap(
52 values: BTreeMap<String, String>,
53 ) -> Result<(Self, BTreeMap<String, String>)> {
54 let mut headers = BTreeMap::new();
56 let mut rest = BTreeMap::new();
57 for (k, v) in &values {
58 if let Some(header_name) = k.strip_prefix("header.") {
59 headers.insert(header_name.to_owned(), v.clone());
60 } else {
61 rest.insert(k.clone(), v.clone());
62 }
63 }
64
65 let config = serde_json::from_value::<HttpConfig>(serde_json::to_value(rest).unwrap())
66 .map_err(|e| SinkError::Config(anyhow!(e)))?;
67
68 if config.r#type != SINK_TYPE_APPEND_ONLY {
69 return Err(SinkError::Config(anyhow!(
70 "HTTP sink only supports append-only mode"
71 )));
72 }
73
74 Ok((config, headers))
75 }
76}
77
78fn validate_http_sink(
81 is_append_only: bool,
82 ignore_delete: bool,
83 schema: &Schema,
84 url: &str,
85 content_type: Option<&str>,
86 headers: &BTreeMap<String, String>,
87) -> Result<(reqwest::Url, HeaderMap)> {
88 if !is_append_only && !ignore_delete {
89 return Err(SinkError::Config(anyhow!(
90 "HTTP sink only supports append-only mode"
91 )));
92 }
93
94 if schema.fields().len() != 1 {
95 return Err(SinkError::Config(anyhow!(
96 "HTTP sink requires exactly 1 column, got {}",
97 schema.fields().len()
98 )));
99 }
100
101 let col_type = schema.fields()[0].data_type.clone();
102 if col_type != DataType::Varchar && col_type != DataType::Jsonb {
103 return Err(SinkError::Config(anyhow!(
104 "HTTP sink column must be varchar or jsonb, got {:?}",
105 col_type
106 )));
107 }
108
109 let parsed_url = url
110 .parse()
111 .context("invalid URL")
112 .map_err(SinkError::Config)?;
113
114 let mut header_map = HeaderMap::new();
115 header_map.insert(
116 CONTENT_TYPE,
117 content_type
118 .unwrap_or(match col_type {
119 DataType::Varchar => "text/plain",
120 DataType::Jsonb => "application/json",
121 _ => unreachable!("validated HTTP sink column type"),
122 })
123 .parse()
124 .context("invalid content_type")
125 .map_err(SinkError::Config)?,
126 );
127 for (k, v) in headers {
128 let name: HeaderName = k
129 .parse()
130 .with_context(|| format!("invalid header name '{k}'"))
131 .map_err(SinkError::Config)?;
132 let value: HeaderValue = v
133 .parse()
134 .with_context(|| format!("invalid header value for '{k}'"))
135 .map_err(SinkError::Config)?;
136 header_map.insert(name, value);
137 }
138
139 Ok((parsed_url, header_map))
140}
141
142#[derive(Clone, Debug)]
143pub struct HttpSink {
144 endpoint: String,
145 header_map: HeaderMap,
146}
147
148impl EnforceSecret for HttpSink {
149 fn enforce_secret<'a>(
150 prop_iter: impl Iterator<Item = &'a str>,
151 ) -> crate::error::ConnectorResult<()> {
152 for prop in prop_iter {
153 HttpConfig::enforce_one(prop)?;
154 }
155 Ok(())
156 }
157}
158
159impl TryFrom<SinkParam> for HttpSink {
160 type Error = SinkError;
161
162 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
163 let schema = param.schema();
164 let (config, headers) = HttpConfig::from_btreemap(param.properties)?;
165 let (_parsed_url, header_map) = validate_http_sink(
166 param.sink_type.is_append_only(),
167 param.ignore_delete,
168 &schema,
169 &config.url,
170 config.content_type.as_deref(),
171 &headers,
172 )?;
173 Ok(Self {
174 endpoint: config.url,
175 header_map,
176 })
177 }
178}
179
180impl Sink for HttpSink {
181 type LogSinker = AsyncTruncateLogSinkerOf<HttpSinkWriter>;
182
183 const SINK_NAME: &'static str = HTTP_SINK;
184
185 async fn validate(&self) -> Result<()> {
186 Ok(())
187 }
188
189 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
190 Ok(
191 HttpSinkWriter::new(self.endpoint.clone(), self.header_map.clone())?
192 .into_log_sinker(usize::MAX),
193 )
194 }
195}
196
197pub struct HttpSinkWriter {
198 client: reqwest::Client,
199 endpoint: String,
200}
201
202impl HttpSinkWriter {
203 pub fn new(endpoint: String, header_map: HeaderMap) -> Result<Self> {
204 let client = reqwest::Client::builder()
205 .default_headers(header_map)
206 .build()
207 .context("failed to build HTTP client")
208 .map_err(SinkError::Http)?;
209
210 Ok(Self { client, endpoint })
211 }
212}
213
214impl AsyncTruncateSinkWriter for HttpSinkWriter {
215 async fn write_chunk<'a>(
216 &'a mut self,
217 chunk: StreamChunk,
218 _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
219 ) -> Result<()> {
220 for (op, row) in chunk.rows() {
221 if op != Op::Insert {
222 continue;
223 }
224
225 let payload = match row.datum_at(0) {
226 Some(ScalarRefImpl::Utf8(s)) => s.to_owned(),
227 Some(ScalarRefImpl::Jsonb(j)) => j.to_string(),
228 Some(_) => {
229 return Err(SinkError::Http(anyhow!(
230 "unexpected column type, expected varchar or jsonb"
231 )));
232 }
233 None => continue, };
235
236 let resp = self
237 .client
238 .post(&self.endpoint)
239 .body(payload)
240 .send()
241 .await
242 .context("HTTP request failed")
243 .map_err(SinkError::Http)?;
244
245 if !resp.status().is_success() {
246 let status = resp.status();
247 let body = resp.text().await.unwrap_or_default();
248 return Err(SinkError::Http(anyhow!(
249 "HTTP sink received non-success response: {} {}",
250 status,
251 body
252 )));
253 }
254 }
255
256 Ok(())
257 }
258}