risingwave_connector/sink/
http.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use anyhow::{Context, anyhow};
18use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue};
19use risingwave_common::array::{Op, StreamChunk};
20use risingwave_common::catalog::Schema;
21use risingwave_common::row::Row;
22use risingwave_common::types::{DataType, JsonbRef, ScalarRefImpl};
23use serde::Deserialize;
24use thiserror_ext::AsReport;
25use with_options::WithOptions;
26
27use crate::enforce_secret::EnforceSecret;
28use crate::sink::log_store::DeliveryFutureManagerAddFuture;
29use crate::sink::writer::{
30    AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt,
31};
32use crate::sink::{Result, SINK_TYPE_APPEND_ONLY, Sink, SinkError, SinkParam, SinkWriterParam};
33
34pub const HTTP_SINK: &str = "http";
35const HTTP_SINK_PAYLOAD_COLUMN: &str = "payload";
36const HTTP_SINK_URL_COLUMN: &str = "url";
37
38#[derive(Clone, Debug, Deserialize, WithOptions)]
39pub struct HttpConfig {
40    /// The endpoint URL to POST data to.
41    pub url: Option<String>,
42
43    /// Content-Type header value. Defaults to `text/plain` for `varchar` and `application/json`
44    /// for `jsonb`.
45    pub content_type: Option<String>,
46
47    /// Sink type, must be "append-only".
48    pub r#type: String,
49}
50
51impl EnforceSecret for HttpConfig {}
52
53impl HttpConfig {
54    pub fn from_btreemap(
55        values: BTreeMap<String, String>,
56    ) -> Result<(Self, BTreeMap<String, String>)> {
57        // Extract header.* keys before serde parsing
58        let mut headers = BTreeMap::new();
59        let mut rest = BTreeMap::new();
60        for (k, v) in &values {
61            if let Some(header_name) = k.strip_prefix("header.") {
62                headers.insert(header_name.to_owned(), v.clone());
63            } else {
64                rest.insert(k.clone(), v.clone());
65            }
66        }
67
68        let config = serde_json::from_value::<HttpConfig>(serde_json::to_value(rest).unwrap())
69            .map_err(|e| SinkError::Config(anyhow!(e)))?;
70
71        if config.r#type != SINK_TYPE_APPEND_ONLY {
72            return Err(SinkError::Config(anyhow!(
73                "HTTP sink only supports append-only mode"
74            )));
75        }
76
77        Ok((config, headers))
78    }
79}
80
81#[derive(Clone, Debug)]
82enum HttpUrl {
83    Static(reqwest::Url),
84    Dynamic { url_index: usize },
85}
86
87/// Validates the HTTP sink parameters and returns the sink so callers can use it directly without
88/// re-parsing.
89fn validate_http_sink(
90    is_append_only: bool,
91    ignore_delete: bool,
92    schema: &Schema,
93    url: Option<&str>,
94    content_type: Option<&str>,
95    headers: &BTreeMap<String, String>,
96) -> Result<HttpSink> {
97    if !is_append_only && !ignore_delete {
98        return Err(SinkError::Config(anyhow!(
99            "HTTP sink only supports append-only mode"
100        )));
101    }
102
103    let fields = schema.fields();
104    let (payload_index, url, payload_type) = if fields.len() == 1 {
105        let Some(url) = url else {
106            return Err(SinkError::Config(anyhow!(
107                "HTTP sink requires url option when schema has exactly 1 column"
108            )));
109        };
110        let url = url
111            .parse()
112            .context("invalid URL")
113            .map_err(SinkError::Config)?;
114        (0, HttpUrl::Static(url), fields[0].data_type.clone())
115    } else {
116        for field in fields {
117            match field.name.as_str() {
118                HTTP_SINK_PAYLOAD_COLUMN | HTTP_SINK_URL_COLUMN => {}
119                _ => {
120                    return Err(SinkError::Config(anyhow!(
121                        "HTTP sink with multiple columns only supports payload and url columns, got {}",
122                        field.name
123                    )));
124                }
125            }
126        }
127
128        let payload_index = fields
129            .iter()
130            .position(|field| field.name == HTTP_SINK_PAYLOAD_COLUMN)
131            .ok_or_else(|| {
132                SinkError::Config(anyhow!(
133                    "HTTP sink with multiple columns requires a payload column"
134                ))
135            })?;
136        let url_index = fields
137            .iter()
138            .position(|field| field.name == HTTP_SINK_URL_COLUMN);
139        let url = match (url, url_index) {
140            (Some(_), Some(_)) => {
141                return Err(SinkError::Config(anyhow!(
142                    "HTTP sink url option cannot coexist with url column"
143                )));
144            }
145            (Some(url), None) => {
146                let url = url
147                    .parse()
148                    .context("invalid URL")
149                    .map_err(SinkError::Config)?;
150                HttpUrl::Static(url)
151            }
152            (None, Some(url_index)) => {
153                if fields[url_index].data_type != DataType::Varchar {
154                    return Err(SinkError::Config(anyhow!(
155                        "HTTP sink url column must be varchar, got {:?}",
156                        fields[url_index].data_type
157                    )));
158                }
159                HttpUrl::Dynamic { url_index }
160            }
161            (None, None) => {
162                return Err(SinkError::Config(anyhow!(
163                    "HTTP sink requires either url option or url column"
164                )));
165            }
166        };
167
168        (payload_index, url, fields[payload_index].data_type.clone())
169    };
170
171    if payload_type != DataType::Varchar && payload_type != DataType::Jsonb {
172        return Err(SinkError::Config(anyhow!(
173            "HTTP sink payload column must be varchar or jsonb, got {:?}",
174            payload_type
175        )));
176    }
177
178    let mut header_map = HeaderMap::new();
179    header_map.insert(
180        CONTENT_TYPE,
181        content_type
182            .unwrap_or(match payload_type {
183                DataType::Varchar => "text/plain",
184                DataType::Jsonb => "application/json",
185                _ => unreachable!("validated HTTP sink column type"),
186            })
187            .parse()
188            .context("invalid content_type")
189            .map_err(SinkError::Config)?,
190    );
191    for (k, v) in headers {
192        let name: HeaderName = k
193            .parse()
194            .with_context(|| format!("invalid header name '{k}'"))
195            .map_err(SinkError::Config)?;
196        let value: HeaderValue = v
197            .parse()
198            .with_context(|| format!("invalid header value for '{k}'"))
199            .map_err(SinkError::Config)?;
200        header_map.insert(name, value);
201    }
202
203    Ok(HttpSink {
204        url,
205        payload_index,
206        header_map,
207    })
208}
209
210#[derive(Clone, Debug)]
211pub struct HttpSink {
212    url: HttpUrl,
213    payload_index: usize,
214    header_map: HeaderMap,
215}
216
217impl EnforceSecret for HttpSink {
218    fn enforce_secret<'a>(
219        prop_iter: impl Iterator<Item = &'a str>,
220    ) -> crate::error::ConnectorResult<()> {
221        for prop in prop_iter {
222            HttpConfig::enforce_one(prop)?;
223        }
224        Ok(())
225    }
226}
227
228impl TryFrom<SinkParam> for HttpSink {
229    type Error = SinkError;
230
231    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
232        let schema = param.schema();
233        let (config, headers) = HttpConfig::from_btreemap(param.properties)?;
234        validate_http_sink(
235            param.sink_type.is_append_only(),
236            param.ignore_delete,
237            &schema,
238            config.url.as_deref(),
239            config.content_type.as_deref(),
240            &headers,
241        )
242    }
243}
244
245impl Sink for HttpSink {
246    type LogSinker = AsyncTruncateLogSinkerOf<HttpSinkWriter>;
247
248    const SINK_NAME: &'static str = HTTP_SINK;
249
250    async fn validate(&self) -> Result<()> {
251        Ok(())
252    }
253
254    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
255        Ok(HttpSinkWriter::new(
256            self.url.clone(),
257            self.payload_index,
258            self.header_map.clone(),
259        )?
260        .into_log_sinker(usize::MAX))
261    }
262}
263
264pub struct HttpSinkWriter {
265    client: reqwest::Client,
266    url: HttpUrl,
267    payload_index: usize,
268}
269
270impl HttpSinkWriter {
271    fn new(url: HttpUrl, payload_index: usize, header_map: HeaderMap) -> Result<Self> {
272        let client = reqwest::Client::builder()
273            .default_headers(header_map)
274            .build()
275            .context("failed to build HTTP client")
276            .map_err(SinkError::Http)?;
277
278        Ok(Self {
279            client,
280            url,
281            payload_index,
282        })
283    }
284
285    fn extract_url(&self, row: &impl Row) -> Result<Option<reqwest::Url>> {
286        match &self.url {
287            HttpUrl::Static(url) => Ok(Some(url.clone())),
288            HttpUrl::Dynamic { url_index } => match row.datum_at(*url_index) {
289                Some(ScalarRefImpl::Utf8(url)) if !url.is_empty() => {
290                    match url.parse::<reqwest::Url>() {
291                        Ok(url) => Ok(Some(url)),
292                        Err(err) => {
293                            tracing::warn!(
294                                error = %err.as_report(),
295                                payload = %self.strip_payload_for_log(row),
296                                "skip HTTP sink row due to invalid URL in url column"
297                            );
298                            Ok(None)
299                        }
300                    }
301                }
302                Some(ScalarRefImpl::Utf8(_)) | None => {
303                    tracing::warn!(
304                        payload = %self.strip_payload_for_log(row),
305                        "skip HTTP sink row due to null or empty url column"
306                    );
307                    Ok(None)
308                }
309                Some(_) => Err(SinkError::Http(anyhow!(
310                    "unexpected url column type, expected varchar"
311                ))),
312            },
313        }
314    }
315
316    fn strip_payload_for_log(&self, row: &impl Row) -> String {
317        match row.datum_at(self.payload_index) {
318            Some(ScalarRefImpl::Utf8(s)) => strip_text_payload(s),
319            Some(ScalarRefImpl::Jsonb(j)) => strip_jsonb_payload(j),
320            Some(_) => "<unexpected payload type>".to_owned(),
321            None => "NULL".to_owned(),
322        }
323    }
324
325    fn extract_payload(&self, row: &impl Row) -> Result<Option<String>> {
326        Ok(match row.datum_at(self.payload_index) {
327            Some(ScalarRefImpl::Utf8(s)) => Some(s.to_owned()),
328            Some(ScalarRefImpl::Jsonb(j)) => Some(j.to_string()),
329            Some(_) => {
330                return Err(SinkError::Http(anyhow!(
331                    "unexpected payload column type, expected varchar or jsonb"
332                )));
333            }
334            None => None, // skip NULL rows
335        })
336    }
337}
338
339fn strip_text_payload(payload: &str) -> String {
340    const EDGE_CHAR_COUNT: usize = 100;
341    let char_count = payload.chars().count();
342    if char_count <= EDGE_CHAR_COUNT * 2 {
343        return payload.to_owned();
344    }
345
346    let prefix: String = payload.chars().take(EDGE_CHAR_COUNT).collect();
347    let suffix: String = payload.chars().skip(char_count - EDGE_CHAR_COUNT).collect();
348    format!("{prefix}...{suffix}")
349}
350
351fn strip_jsonb_payload(payload: JsonbRef<'_>) -> String {
352    if payload.is_array() {
353        let len = payload.array_len().expect("checked JSON array type");
354        return match len {
355            0 => "[]".to_owned(),
356            1 => {
357                let first = payload.access_array_element(0).expect("JSON array element");
358                format!("[{}]", strip_jsonb_payload(first))
359            }
360            2 => {
361                let first = payload.access_array_element(0).expect("JSON array element");
362                let second = payload.access_array_element(1).expect("JSON array element");
363                format!(
364                    "[{},{}]",
365                    strip_jsonb_payload(first),
366                    strip_jsonb_payload(second)
367                )
368            }
369            _ => {
370                let first = payload.access_array_element(0).expect("JSON array element");
371                let last = payload
372                    .access_array_element(len - 1)
373                    .expect("JSON array element");
374                format!(
375                    "[{},...,{}]",
376                    strip_jsonb_payload(first),
377                    strip_jsonb_payload(last)
378                )
379            }
380        };
381    }
382
383    if let Ok(fields) = payload.object_key_values() {
384        let fields = fields
385            .map(|(key, value)| {
386                let key = serde_json::to_string(key).expect("serialize JSON object key");
387                format!("{key}:{}", strip_jsonb_payload(value))
388            })
389            .collect::<Vec<_>>();
390        return format!("{{{}}}", fields.join(","));
391    }
392
393    if let Ok(value) = payload.as_str() {
394        return serde_json::to_string(&strip_text_payload(value)).expect("serialize JSON string");
395    }
396
397    payload.to_string()
398}
399
400impl AsyncTruncateSinkWriter for HttpSinkWriter {
401    async fn write_chunk<'a>(
402        &'a mut self,
403        chunk: StreamChunk,
404        _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
405    ) -> Result<()> {
406        for (op, row) in chunk.rows() {
407            if op != Op::Insert {
408                continue;
409            }
410
411            let Some(payload) = self.extract_payload(&row)? else {
412                continue;
413            };
414            let Some(url) = self.extract_url(&row)? else {
415                continue;
416            };
417
418            let resp = self
419                .client
420                .post(url)
421                .body(payload)
422                .send()
423                .await
424                .context("HTTP request failed")
425                .map_err(SinkError::Http)?;
426
427            if !resp.status().is_success() {
428                let status = resp.status();
429                let body = resp.text().await.unwrap_or_default();
430                return Err(SinkError::Http(anyhow!(
431                    "HTTP sink received non-success response: {} {}",
432                    status,
433                    body
434                )));
435            }
436        }
437
438        Ok(())
439    }
440}
441
442#[cfg(test)]
443mod tests {
444    use risingwave_common::types::{JsonbVal, Scalar};
445
446    use super::*;
447
448    #[test]
449    fn test_strip_text_payload() {
450        assert_eq!(strip_text_payload("short payload"), "short payload");
451
452        let payload = format!("{}{}", "a".repeat(100), "z".repeat(100));
453        assert_eq!(strip_text_payload(&payload), payload);
454
455        let payload = format!("{}middle{}", "a".repeat(101), "z".repeat(101));
456        assert_eq!(
457            strip_text_payload(&payload),
458            format!("{}...{}", "a".repeat(100), "z".repeat(100))
459        );
460    }
461
462    #[test]
463    fn test_strip_jsonb_payload() {
464        let payload: JsonbVal = r#"{
465            "id": 1,
466            "items": [1, {"nested": ["first", "middle", "last"]}, 3],
467            "message": "short"
468        }"#
469        .parse()
470        .unwrap();
471
472        assert_eq!(
473            strip_jsonb_payload(payload.as_scalar_ref()),
474            r#"{"id":1,"items":[1,...,3],"message":"short"}"#
475        );
476
477        let payload: JsonbVal = r#"["only"]"#.parse().unwrap();
478        assert_eq!(strip_jsonb_payload(payload.as_scalar_ref()), r#"["only"]"#);
479
480        let payload: JsonbVal = r#"["first","last"]"#.parse().unwrap();
481        assert_eq!(
482            strip_jsonb_payload(payload.as_scalar_ref()),
483            r#"["first","last"]"#
484        );
485    }
486}