risingwave_connector/sink/
http.rs1use std::collections::BTreeMap;
16
17use anyhow::{Context, anyhow};
18use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue};
19use risingwave_common::array::{Op, StreamChunk};
20use risingwave_common::catalog::Schema;
21use risingwave_common::row::Row;
22use risingwave_common::types::{DataType, JsonbRef, ScalarRefImpl};
23use serde::Deserialize;
24use thiserror_ext::AsReport;
25use with_options::WithOptions;
26
27use crate::enforce_secret::EnforceSecret;
28use crate::sink::log_store::DeliveryFutureManagerAddFuture;
29use crate::sink::writer::{
30 AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt,
31};
32use crate::sink::{Result, SINK_TYPE_APPEND_ONLY, Sink, SinkError, SinkParam, SinkWriterParam};
33
34pub const HTTP_SINK: &str = "http";
35const HTTP_SINK_PAYLOAD_COLUMN: &str = "payload";
36const HTTP_SINK_URL_COLUMN: &str = "url";
37
38#[derive(Clone, Debug, Deserialize, WithOptions)]
39pub struct HttpConfig {
40 pub url: Option<String>,
42
43 pub content_type: Option<String>,
46
47 pub r#type: String,
49}
50
51impl EnforceSecret for HttpConfig {}
52
53impl HttpConfig {
54 pub fn from_btreemap(
55 values: BTreeMap<String, String>,
56 ) -> Result<(Self, BTreeMap<String, String>)> {
57 let mut headers = BTreeMap::new();
59 let mut rest = BTreeMap::new();
60 for (k, v) in &values {
61 if let Some(header_name) = k.strip_prefix("header.") {
62 headers.insert(header_name.to_owned(), v.clone());
63 } else {
64 rest.insert(k.clone(), v.clone());
65 }
66 }
67
68 let config = serde_json::from_value::<HttpConfig>(serde_json::to_value(rest).unwrap())
69 .map_err(|e| SinkError::Config(anyhow!(e)))?;
70
71 if config.r#type != SINK_TYPE_APPEND_ONLY {
72 return Err(SinkError::Config(anyhow!(
73 "HTTP sink only supports append-only mode"
74 )));
75 }
76
77 Ok((config, headers))
78 }
79}
80
81#[derive(Clone, Debug)]
82enum HttpUrl {
83 Static(reqwest::Url),
84 Dynamic { url_index: usize },
85}
86
87fn validate_http_sink(
90 is_append_only: bool,
91 ignore_delete: bool,
92 schema: &Schema,
93 url: Option<&str>,
94 content_type: Option<&str>,
95 headers: &BTreeMap<String, String>,
96) -> Result<HttpSink> {
97 if !is_append_only && !ignore_delete {
98 return Err(SinkError::Config(anyhow!(
99 "HTTP sink only supports append-only mode"
100 )));
101 }
102
103 let fields = schema.fields();
104 let (payload_index, url, payload_type) = if fields.len() == 1 {
105 let Some(url) = url else {
106 return Err(SinkError::Config(anyhow!(
107 "HTTP sink requires url option when schema has exactly 1 column"
108 )));
109 };
110 let url = url
111 .parse()
112 .context("invalid URL")
113 .map_err(SinkError::Config)?;
114 (0, HttpUrl::Static(url), fields[0].data_type.clone())
115 } else {
116 for field in fields {
117 match field.name.as_str() {
118 HTTP_SINK_PAYLOAD_COLUMN | HTTP_SINK_URL_COLUMN => {}
119 _ => {
120 return Err(SinkError::Config(anyhow!(
121 "HTTP sink with multiple columns only supports payload and url columns, got {}",
122 field.name
123 )));
124 }
125 }
126 }
127
128 let payload_index = fields
129 .iter()
130 .position(|field| field.name == HTTP_SINK_PAYLOAD_COLUMN)
131 .ok_or_else(|| {
132 SinkError::Config(anyhow!(
133 "HTTP sink with multiple columns requires a payload column"
134 ))
135 })?;
136 let url_index = fields
137 .iter()
138 .position(|field| field.name == HTTP_SINK_URL_COLUMN);
139 let url = match (url, url_index) {
140 (Some(_), Some(_)) => {
141 return Err(SinkError::Config(anyhow!(
142 "HTTP sink url option cannot coexist with url column"
143 )));
144 }
145 (Some(url), None) => {
146 let url = url
147 .parse()
148 .context("invalid URL")
149 .map_err(SinkError::Config)?;
150 HttpUrl::Static(url)
151 }
152 (None, Some(url_index)) => {
153 if fields[url_index].data_type != DataType::Varchar {
154 return Err(SinkError::Config(anyhow!(
155 "HTTP sink url column must be varchar, got {:?}",
156 fields[url_index].data_type
157 )));
158 }
159 HttpUrl::Dynamic { url_index }
160 }
161 (None, None) => {
162 return Err(SinkError::Config(anyhow!(
163 "HTTP sink requires either url option or url column"
164 )));
165 }
166 };
167
168 (payload_index, url, fields[payload_index].data_type.clone())
169 };
170
171 if payload_type != DataType::Varchar && payload_type != DataType::Jsonb {
172 return Err(SinkError::Config(anyhow!(
173 "HTTP sink payload column must be varchar or jsonb, got {:?}",
174 payload_type
175 )));
176 }
177
178 let mut header_map = HeaderMap::new();
179 header_map.insert(
180 CONTENT_TYPE,
181 content_type
182 .unwrap_or(match payload_type {
183 DataType::Varchar => "text/plain",
184 DataType::Jsonb => "application/json",
185 _ => unreachable!("validated HTTP sink column type"),
186 })
187 .parse()
188 .context("invalid content_type")
189 .map_err(SinkError::Config)?,
190 );
191 for (k, v) in headers {
192 let name: HeaderName = k
193 .parse()
194 .with_context(|| format!("invalid header name '{k}'"))
195 .map_err(SinkError::Config)?;
196 let value: HeaderValue = v
197 .parse()
198 .with_context(|| format!("invalid header value for '{k}'"))
199 .map_err(SinkError::Config)?;
200 header_map.insert(name, value);
201 }
202
203 Ok(HttpSink {
204 url,
205 payload_index,
206 header_map,
207 })
208}
209
210#[derive(Clone, Debug)]
211pub struct HttpSink {
212 url: HttpUrl,
213 payload_index: usize,
214 header_map: HeaderMap,
215}
216
217impl EnforceSecret for HttpSink {
218 fn enforce_secret<'a>(
219 prop_iter: impl Iterator<Item = &'a str>,
220 ) -> crate::error::ConnectorResult<()> {
221 for prop in prop_iter {
222 HttpConfig::enforce_one(prop)?;
223 }
224 Ok(())
225 }
226}
227
228impl TryFrom<SinkParam> for HttpSink {
229 type Error = SinkError;
230
231 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
232 let schema = param.schema();
233 let (config, headers) = HttpConfig::from_btreemap(param.properties)?;
234 validate_http_sink(
235 param.sink_type.is_append_only(),
236 param.ignore_delete,
237 &schema,
238 config.url.as_deref(),
239 config.content_type.as_deref(),
240 &headers,
241 )
242 }
243}
244
245impl Sink for HttpSink {
246 type LogSinker = AsyncTruncateLogSinkerOf<HttpSinkWriter>;
247
248 const SINK_NAME: &'static str = HTTP_SINK;
249
250 async fn validate(&self) -> Result<()> {
251 Ok(())
252 }
253
254 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
255 Ok(HttpSinkWriter::new(
256 self.url.clone(),
257 self.payload_index,
258 self.header_map.clone(),
259 )?
260 .into_log_sinker(usize::MAX))
261 }
262}
263
264pub struct HttpSinkWriter {
265 client: reqwest::Client,
266 url: HttpUrl,
267 payload_index: usize,
268}
269
270impl HttpSinkWriter {
271 fn new(url: HttpUrl, payload_index: usize, header_map: HeaderMap) -> Result<Self> {
272 let client = reqwest::Client::builder()
273 .default_headers(header_map)
274 .build()
275 .context("failed to build HTTP client")
276 .map_err(SinkError::Http)?;
277
278 Ok(Self {
279 client,
280 url,
281 payload_index,
282 })
283 }
284
285 fn extract_url(&self, row: &impl Row) -> Result<Option<reqwest::Url>> {
286 match &self.url {
287 HttpUrl::Static(url) => Ok(Some(url.clone())),
288 HttpUrl::Dynamic { url_index } => match row.datum_at(*url_index) {
289 Some(ScalarRefImpl::Utf8(url)) if !url.is_empty() => {
290 match url.parse::<reqwest::Url>() {
291 Ok(url) => Ok(Some(url)),
292 Err(err) => {
293 tracing::warn!(
294 error = %err.as_report(),
295 payload = %self.strip_payload_for_log(row),
296 "skip HTTP sink row due to invalid URL in url column"
297 );
298 Ok(None)
299 }
300 }
301 }
302 Some(ScalarRefImpl::Utf8(_)) | None => {
303 tracing::warn!(
304 payload = %self.strip_payload_for_log(row),
305 "skip HTTP sink row due to null or empty url column"
306 );
307 Ok(None)
308 }
309 Some(_) => Err(SinkError::Http(anyhow!(
310 "unexpected url column type, expected varchar"
311 ))),
312 },
313 }
314 }
315
316 fn strip_payload_for_log(&self, row: &impl Row) -> String {
317 match row.datum_at(self.payload_index) {
318 Some(ScalarRefImpl::Utf8(s)) => strip_text_payload(s),
319 Some(ScalarRefImpl::Jsonb(j)) => strip_jsonb_payload(j),
320 Some(_) => "<unexpected payload type>".to_owned(),
321 None => "NULL".to_owned(),
322 }
323 }
324
325 fn extract_payload(&self, row: &impl Row) -> Result<Option<String>> {
326 Ok(match row.datum_at(self.payload_index) {
327 Some(ScalarRefImpl::Utf8(s)) => Some(s.to_owned()),
328 Some(ScalarRefImpl::Jsonb(j)) => Some(j.to_string()),
329 Some(_) => {
330 return Err(SinkError::Http(anyhow!(
331 "unexpected payload column type, expected varchar or jsonb"
332 )));
333 }
334 None => None, })
336 }
337}
338
339fn strip_text_payload(payload: &str) -> String {
340 const EDGE_CHAR_COUNT: usize = 100;
341 let char_count = payload.chars().count();
342 if char_count <= EDGE_CHAR_COUNT * 2 {
343 return payload.to_owned();
344 }
345
346 let prefix: String = payload.chars().take(EDGE_CHAR_COUNT).collect();
347 let suffix: String = payload.chars().skip(char_count - EDGE_CHAR_COUNT).collect();
348 format!("{prefix}...{suffix}")
349}
350
351fn strip_jsonb_payload(payload: JsonbRef<'_>) -> String {
352 if payload.is_array() {
353 let len = payload.array_len().expect("checked JSON array type");
354 return match len {
355 0 => "[]".to_owned(),
356 1 => {
357 let first = payload.access_array_element(0).expect("JSON array element");
358 format!("[{}]", strip_jsonb_payload(first))
359 }
360 2 => {
361 let first = payload.access_array_element(0).expect("JSON array element");
362 let second = payload.access_array_element(1).expect("JSON array element");
363 format!(
364 "[{},{}]",
365 strip_jsonb_payload(first),
366 strip_jsonb_payload(second)
367 )
368 }
369 _ => {
370 let first = payload.access_array_element(0).expect("JSON array element");
371 let last = payload
372 .access_array_element(len - 1)
373 .expect("JSON array element");
374 format!(
375 "[{},...,{}]",
376 strip_jsonb_payload(first),
377 strip_jsonb_payload(last)
378 )
379 }
380 };
381 }
382
383 if let Ok(fields) = payload.object_key_values() {
384 let fields = fields
385 .map(|(key, value)| {
386 let key = serde_json::to_string(key).expect("serialize JSON object key");
387 format!("{key}:{}", strip_jsonb_payload(value))
388 })
389 .collect::<Vec<_>>();
390 return format!("{{{}}}", fields.join(","));
391 }
392
393 if let Ok(value) = payload.as_str() {
394 return serde_json::to_string(&strip_text_payload(value)).expect("serialize JSON string");
395 }
396
397 payload.to_string()
398}
399
400impl AsyncTruncateSinkWriter for HttpSinkWriter {
401 async fn write_chunk<'a>(
402 &'a mut self,
403 chunk: StreamChunk,
404 _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
405 ) -> Result<()> {
406 for (op, row) in chunk.rows() {
407 if op != Op::Insert {
408 continue;
409 }
410
411 let Some(payload) = self.extract_payload(&row)? else {
412 continue;
413 };
414 let Some(url) = self.extract_url(&row)? else {
415 continue;
416 };
417
418 let resp = self
419 .client
420 .post(url)
421 .body(payload)
422 .send()
423 .await
424 .context("HTTP request failed")
425 .map_err(SinkError::Http)?;
426
427 if !resp.status().is_success() {
428 let status = resp.status();
429 let body = resp.text().await.unwrap_or_default();
430 return Err(SinkError::Http(anyhow!(
431 "HTTP sink received non-success response: {} {}",
432 status,
433 body
434 )));
435 }
436 }
437
438 Ok(())
439 }
440}
441
442#[cfg(test)]
443mod tests {
444 use risingwave_common::types::{JsonbVal, Scalar};
445
446 use super::*;
447
448 #[test]
449 fn test_strip_text_payload() {
450 assert_eq!(strip_text_payload("short payload"), "short payload");
451
452 let payload = format!("{}{}", "a".repeat(100), "z".repeat(100));
453 assert_eq!(strip_text_payload(&payload), payload);
454
455 let payload = format!("{}middle{}", "a".repeat(101), "z".repeat(101));
456 assert_eq!(
457 strip_text_payload(&payload),
458 format!("{}...{}", "a".repeat(100), "z".repeat(100))
459 );
460 }
461
462 #[test]
463 fn test_strip_jsonb_payload() {
464 let payload: JsonbVal = r#"{
465 "id": 1,
466 "items": [1, {"nested": ["first", "middle", "last"]}, 3],
467 "message": "short"
468 }"#
469 .parse()
470 .unwrap();
471
472 assert_eq!(
473 strip_jsonb_payload(payload.as_scalar_ref()),
474 r#"{"id":1,"items":[1,...,3],"message":"short"}"#
475 );
476
477 let payload: JsonbVal = r#"["only"]"#.parse().unwrap();
478 assert_eq!(strip_jsonb_payload(payload.as_scalar_ref()), r#"["only"]"#);
479
480 let payload: JsonbVal = r#"["first","last"]"#.parse().unwrap();
481 assert_eq!(
482 strip_jsonb_payload(payload.as_scalar_ref()),
483 r#"["first","last"]"#
484 );
485 }
486}