risingwave_connector/sink/
http.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use anyhow::{Context, anyhow};
18use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue};
19use risingwave_common::array::{Op, StreamChunk};
20use risingwave_common::catalog::Schema;
21use risingwave_common::row::Row;
22use risingwave_common::types::{DataType, ScalarRefImpl};
23use serde::Deserialize;
24use with_options::WithOptions;
25
26use crate::enforce_secret::EnforceSecret;
27use crate::sink::log_store::DeliveryFutureManagerAddFuture;
28use crate::sink::writer::{
29    AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt,
30};
31use crate::sink::{Result, SINK_TYPE_APPEND_ONLY, Sink, SinkError, SinkParam, SinkWriterParam};
32
33pub const HTTP_SINK: &str = "http";
34
35#[derive(Clone, Debug, Deserialize, WithOptions)]
36pub struct HttpConfig {
37    /// The endpoint URL to POST data to.
38    pub url: String,
39
40    /// Content-Type header value. Defaults to `text/plain` for `varchar` and `application/json`
41    /// for `jsonb`.
42    pub content_type: Option<String>,
43
44    /// Sink type, must be "append-only".
45    pub r#type: String,
46}
47
48impl EnforceSecret for HttpConfig {}
49
50impl HttpConfig {
51    pub fn from_btreemap(
52        values: BTreeMap<String, String>,
53    ) -> Result<(Self, BTreeMap<String, String>)> {
54        // Extract header.* keys before serde parsing
55        let mut headers = BTreeMap::new();
56        let mut rest = BTreeMap::new();
57        for (k, v) in &values {
58            if let Some(header_name) = k.strip_prefix("header.") {
59                headers.insert(header_name.to_owned(), v.clone());
60            } else {
61                rest.insert(k.clone(), v.clone());
62            }
63        }
64
65        let config = serde_json::from_value::<HttpConfig>(serde_json::to_value(rest).unwrap())
66            .map_err(|e| SinkError::Config(anyhow!(e)))?;
67
68        if config.r#type != SINK_TYPE_APPEND_ONLY {
69            return Err(SinkError::Config(anyhow!(
70                "HTTP sink only supports append-only mode"
71            )));
72        }
73
74        Ok((config, headers))
75    }
76}
77
78/// Validates the HTTP sink parameters and returns the parsed URL and default headers so callers
79/// can use them directly without re-parsing.
80fn validate_http_sink(
81    is_append_only: bool,
82    ignore_delete: bool,
83    schema: &Schema,
84    url: &str,
85    content_type: Option<&str>,
86    headers: &BTreeMap<String, String>,
87) -> Result<(reqwest::Url, HeaderMap)> {
88    if !is_append_only && !ignore_delete {
89        return Err(SinkError::Config(anyhow!(
90            "HTTP sink only supports append-only mode"
91        )));
92    }
93
94    if schema.fields().len() != 1 {
95        return Err(SinkError::Config(anyhow!(
96            "HTTP sink requires exactly 1 column, got {}",
97            schema.fields().len()
98        )));
99    }
100
101    let col_type = schema.fields()[0].data_type.clone();
102    if col_type != DataType::Varchar && col_type != DataType::Jsonb {
103        return Err(SinkError::Config(anyhow!(
104            "HTTP sink column must be varchar or jsonb, got {:?}",
105            col_type
106        )));
107    }
108
109    let parsed_url = url
110        .parse()
111        .context("invalid URL")
112        .map_err(SinkError::Config)?;
113
114    let mut header_map = HeaderMap::new();
115    header_map.insert(
116        CONTENT_TYPE,
117        content_type
118            .unwrap_or(match col_type {
119                DataType::Varchar => "text/plain",
120                DataType::Jsonb => "application/json",
121                _ => unreachable!("validated HTTP sink column type"),
122            })
123            .parse()
124            .context("invalid content_type")
125            .map_err(SinkError::Config)?,
126    );
127    for (k, v) in headers {
128        let name: HeaderName = k
129            .parse()
130            .with_context(|| format!("invalid header name '{k}'"))
131            .map_err(SinkError::Config)?;
132        let value: HeaderValue = v
133            .parse()
134            .with_context(|| format!("invalid header value for '{k}'"))
135            .map_err(SinkError::Config)?;
136        header_map.insert(name, value);
137    }
138
139    Ok((parsed_url, header_map))
140}
141
142#[derive(Clone, Debug)]
143pub struct HttpSink {
144    endpoint: String,
145    header_map: HeaderMap,
146}
147
148impl EnforceSecret for HttpSink {
149    fn enforce_secret<'a>(
150        prop_iter: impl Iterator<Item = &'a str>,
151    ) -> crate::error::ConnectorResult<()> {
152        for prop in prop_iter {
153            HttpConfig::enforce_one(prop)?;
154        }
155        Ok(())
156    }
157}
158
159impl TryFrom<SinkParam> for HttpSink {
160    type Error = SinkError;
161
162    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
163        let schema = param.schema();
164        let (config, headers) = HttpConfig::from_btreemap(param.properties)?;
165        let (_parsed_url, header_map) = validate_http_sink(
166            param.sink_type.is_append_only(),
167            param.ignore_delete,
168            &schema,
169            &config.url,
170            config.content_type.as_deref(),
171            &headers,
172        )?;
173        Ok(Self {
174            endpoint: config.url,
175            header_map,
176        })
177    }
178}
179
180impl Sink for HttpSink {
181    type LogSinker = AsyncTruncateLogSinkerOf<HttpSinkWriter>;
182
183    const SINK_NAME: &'static str = HTTP_SINK;
184
185    async fn validate(&self) -> Result<()> {
186        Ok(())
187    }
188
189    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
190        Ok(
191            HttpSinkWriter::new(self.endpoint.clone(), self.header_map.clone())?
192                .into_log_sinker(usize::MAX),
193        )
194    }
195}
196
197pub struct HttpSinkWriter {
198    client: reqwest::Client,
199    endpoint: String,
200}
201
202impl HttpSinkWriter {
203    pub fn new(endpoint: String, header_map: HeaderMap) -> Result<Self> {
204        let client = reqwest::Client::builder()
205            .default_headers(header_map)
206            .build()
207            .context("failed to build HTTP client")
208            .map_err(SinkError::Http)?;
209
210        Ok(Self { client, endpoint })
211    }
212}
213
214impl AsyncTruncateSinkWriter for HttpSinkWriter {
215    async fn write_chunk<'a>(
216        &'a mut self,
217        chunk: StreamChunk,
218        _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
219    ) -> Result<()> {
220        for (op, row) in chunk.rows() {
221            if op != Op::Insert {
222                continue;
223            }
224
225            let payload = match row.datum_at(0) {
226                Some(ScalarRefImpl::Utf8(s)) => s.to_owned(),
227                Some(ScalarRefImpl::Jsonb(j)) => j.to_string(),
228                Some(_) => {
229                    return Err(SinkError::Http(anyhow!(
230                        "unexpected column type, expected varchar or jsonb"
231                    )));
232                }
233                None => continue, // skip NULL rows
234            };
235
236            let resp = self
237                .client
238                .post(&self.endpoint)
239                .body(payload)
240                .send()
241                .await
242                .context("HTTP request failed")
243                .map_err(SinkError::Http)?;
244
245            if !resp.status().is_success() {
246                let status = resp.status();
247                let body = resp.text().await.unwrap_or_default();
248                return Err(SinkError::Http(anyhow!(
249                    "HTTP sink received non-success response: {} {}",
250                    status,
251                    body
252                )));
253            }
254        }
255
256        Ok(())
257    }
258}