risingwave_frontend/webhook/
payload.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use axum::http::{HeaderMap, StatusCode};
17use risingwave_common::row::OwnedRow;
18use risingwave_common::types::ToOwnedDatum;
19use risingwave_connector::parser::{
20    Access, AccessResult, BigintUnsignedHandlingMode, JsonAccessBuilder, JsonProperties,
21    TimeHandling, TimestampHandling, TimestamptzHandling,
22};
23
24use super::WebhookTableColumnDesc;
25use super::utils::{Result, err};
26
27const WEBHOOK_FORMAT_HEADER: &str = "x-rw-webhook-format";
28const WEBHOOK_ENCODE_HEADER: &str = "x-rw-webhook-encode";
29const WEBHOOK_JSON_TIMESTAMP_HANDLING_HEADER: &str = "x-rw-webhook-json-timestamp-handling-mode";
30const WEBHOOK_JSON_TIMESTAMPTZ_HANDLING_HEADER: &str =
31    "x-rw-webhook-json-timestamptz-handling-mode";
32const WEBHOOK_JSON_TIME_HANDLING_HEADER: &str = "x-rw-webhook-json-time-handling-mode";
33const WEBHOOK_JSON_BIGINT_UNSIGNED_HANDLING_HEADER: &str =
34    "x-rw-webhook-json-bigint-unsigned-handling-mode";
35const WEBHOOK_JSON_HANDLE_TOAST_COLUMNS_HEADER: &str = "x-rw-webhook-json-handle-toast-columns";
36
37pub(crate) fn build_json_access_builder(headers: &HeaderMap) -> Result<JsonAccessBuilder> {
38    JsonAccessBuilder::new(json_properties_from_headers(headers)?).map_err(|e| {
39        err(
40            anyhow!(e).context("failed to build webhook JSON decoder"),
41            StatusCode::INTERNAL_SERVER_ERROR,
42        )
43    })
44}
45
46pub(crate) fn owned_row_from_payload_row(
47    access_builder: &mut JsonAccessBuilder,
48    columns: &[WebhookTableColumnDesc],
49    payload_row: &[u8],
50) -> Result<OwnedRow> {
51    let access = access_builder
52        .generate_json_access(payload_row.to_vec())
53        .map_err(|e| {
54            err(
55                anyhow!(e).context("failed to decode webhook JSON payload"),
56                StatusCode::UNPROCESSABLE_ENTITY,
57            )
58        })?;
59    let row = columns
60        .iter()
61        .map(
62            |column| match access.access(&[column.name.as_str()], &column.data_type) {
63                Ok(datum) => Ok(datum.to_owned_datum()),
64                Err(error) if column.is_pk => Err(error),
65                Err(_) => Ok(None),
66            },
67        )
68        .collect::<AccessResult<Vec<_>>>()
69        .map(OwnedRow::new);
70    row.map_err(|e| {
71        err(
72            anyhow!(e).context("failed to decode webhook JSON payload"),
73            StatusCode::UNPROCESSABLE_ENTITY,
74        )
75    })
76}
77
78fn json_properties_from_headers(headers: &HeaderMap) -> Result<JsonProperties> {
79    let format = header_value(headers, WEBHOOK_FORMAT_HEADER)?.unwrap_or_else(|| "plain".into());
80    if !format.eq_ignore_ascii_case("plain") {
81        return Err(err(
82            anyhow!("unsupported webhook payload format `{format}`"),
83            StatusCode::BAD_REQUEST,
84        ));
85    }
86
87    let encode = header_value(headers, WEBHOOK_ENCODE_HEADER)?.unwrap_or_else(|| "json".into());
88    if !encode.eq_ignore_ascii_case("json") {
89        return Err(err(
90            anyhow!("unsupported webhook payload encode `{encode}`"),
91            StatusCode::BAD_REQUEST,
92        ));
93    }
94
95    let timestamp_handling =
96        parse_timestamp_handling(headers, WEBHOOK_JSON_TIMESTAMP_HANDLING_HEADER)?;
97    let timestamptz_handling =
98        parse_timestamptz_handling(headers, WEBHOOK_JSON_TIMESTAMPTZ_HANDLING_HEADER)?;
99    let time_handling = parse_time_handling(headers, WEBHOOK_JSON_TIME_HANDLING_HEADER)?;
100    let bigint_unsigned_handling =
101        parse_bigint_unsigned_handling(headers, WEBHOOK_JSON_BIGINT_UNSIGNED_HANDLING_HEADER)?;
102    let handle_toast_columns =
103        parse_bool_header(headers, WEBHOOK_JSON_HANDLE_TOAST_COLUMNS_HEADER)?.unwrap_or(false);
104
105    Ok(JsonProperties {
106        use_schema_registry: false,
107        timestamp_handling,
108        timestamptz_handling,
109        time_handling,
110        bigint_unsigned_handling,
111        handle_toast_columns,
112    })
113}
114
115fn parse_timestamp_handling(
116    headers: &HeaderMap,
117    key: &'static str,
118) -> Result<Option<TimestampHandling>> {
119    match header_value(headers, key)?.as_deref() {
120        Some("milli") => Ok(Some(TimestampHandling::Milli)),
121        Some("guess_number_unit") => Ok(Some(TimestampHandling::GuessNumberUnit)),
122        Some(value) => Err(err(
123            anyhow!("unrecognized `{key}` value `{value}`"),
124            StatusCode::BAD_REQUEST,
125        )),
126        None => Ok(None),
127    }
128}
129
130fn parse_timestamptz_handling(
131    headers: &HeaderMap,
132    key: &'static str,
133) -> Result<Option<TimestamptzHandling>> {
134    header_value(headers, key)?
135        .as_deref()
136        .map(TimestamptzHandling::from_options)
137        .transpose()
138        .map_err(|e| {
139            err(
140                anyhow!(e).context("invalid webhook JSON decoder option"),
141                StatusCode::BAD_REQUEST,
142            )
143        })
144}
145
146fn parse_time_handling(headers: &HeaderMap, key: &'static str) -> Result<Option<TimeHandling>> {
147    match header_value(headers, key)?.as_deref() {
148        Some("milli") => Ok(Some(TimeHandling::Milli)),
149        Some("micro") => Ok(Some(TimeHandling::Micro)),
150        Some(value) => Err(err(
151            anyhow!("unrecognized `{key}` value `{value}`"),
152            StatusCode::BAD_REQUEST,
153        )),
154        None => Ok(None),
155    }
156}
157
158fn parse_bigint_unsigned_handling(
159    headers: &HeaderMap,
160    key: &'static str,
161) -> Result<Option<BigintUnsignedHandlingMode>> {
162    match header_value(headers, key)?.as_deref() {
163        Some("long") => Ok(Some(BigintUnsignedHandlingMode::Long)),
164        Some("precise") => Ok(Some(BigintUnsignedHandlingMode::Precise)),
165        Some(value) => Err(err(
166            anyhow!("unrecognized `{key}` value `{value}`"),
167            StatusCode::BAD_REQUEST,
168        )),
169        None => Ok(None),
170    }
171}
172
173fn parse_bool_header(headers: &HeaderMap, key: &'static str) -> Result<Option<bool>> {
174    match header_value(headers, key)?.as_deref() {
175        Some("true") => Ok(Some(true)),
176        Some("false") => Ok(Some(false)),
177        Some(value) => Err(err(
178            anyhow!("unrecognized `{key}` value `{value}`"),
179            StatusCode::BAD_REQUEST,
180        )),
181        None => Ok(None),
182    }
183}
184
185fn header_value(headers: &HeaderMap, key: &'static str) -> Result<Option<String>> {
186    headers
187        .get(key)
188        .map(|value| {
189            value.to_str().map(|value| value.to_owned()).map_err(|e| {
190                err(
191                    anyhow!(e).context(format!("invalid UTF-8 in `{key}` header")),
192                    StatusCode::BAD_REQUEST,
193                )
194            })
195        })
196        .transpose()
197}
198
199#[cfg(test)]
200mod tests {
201    use axum::http::{HeaderMap, HeaderValue};
202    use risingwave_common::row::{OwnedRow, Row};
203    use risingwave_common::types::{DataType, ScalarImpl, ToOwnedDatum};
204
205    use super::*;
206
207    fn decode_payload_row(columns: &[WebhookTableColumnDesc], payload: &[u8]) -> Result<OwnedRow> {
208        let mut access_builder = build_json_access_builder(&HeaderMap::new())?;
209        owned_row_from_payload_row(&mut access_builder, columns, payload)
210    }
211
212    fn test_columns(columns: &[(&str, DataType, bool)]) -> Vec<WebhookTableColumnDesc> {
213        columns
214            .iter()
215            .map(|(name, data_type, is_pk)| WebhookTableColumnDesc {
216                name: (*name).to_owned(),
217                data_type: data_type.clone(),
218                is_pk: *is_pk,
219            })
220            .collect()
221    }
222
223    #[test]
224    fn test_parse_plain_json_config_defaults() {
225        let config = json_properties_from_headers(&HeaderMap::new()).unwrap();
226        assert!(config.timestamptz_handling.is_none());
227    }
228
229    #[test]
230    fn test_parse_plain_json_config_rejects_unsupported_encode() {
231        let mut headers = HeaderMap::new();
232        headers.insert(WEBHOOK_ENCODE_HEADER, HeaderValue::from_static("csv"));
233        let err = json_properties_from_headers(&headers).unwrap_err();
234        assert_eq!(err.code(), StatusCode::BAD_REQUEST);
235    }
236
237    #[test]
238    fn test_parse_plain_json_config_accepts_timestamptz_mode() {
239        let mut headers = HeaderMap::new();
240        headers.insert(
241            WEBHOOK_JSON_TIMESTAMPTZ_HANDLING_HEADER,
242            HeaderValue::from_static("milli"),
243        );
244        let config = json_properties_from_headers(&headers).unwrap();
245        assert!(matches!(
246            config.timestamptz_handling,
247            Some(TimestamptzHandling::Milli)
248        ));
249    }
250
251    #[test]
252    fn test_parse_plain_json_config_accepts_all_supported_modes() {
253        let mut headers = HeaderMap::new();
254        headers.insert(
255            WEBHOOK_JSON_TIMESTAMP_HANDLING_HEADER,
256            HeaderValue::from_static("milli"),
257        );
258        headers.insert(
259            WEBHOOK_JSON_TIMESTAMPTZ_HANDLING_HEADER,
260            HeaderValue::from_static("micro"),
261        );
262        headers.insert(
263            WEBHOOK_JSON_TIME_HANDLING_HEADER,
264            HeaderValue::from_static("milli"),
265        );
266        headers.insert(
267            WEBHOOK_JSON_BIGINT_UNSIGNED_HANDLING_HEADER,
268            HeaderValue::from_static("precise"),
269        );
270        headers.insert(
271            WEBHOOK_JSON_HANDLE_TOAST_COLUMNS_HEADER,
272            HeaderValue::from_static("true"),
273        );
274
275        let config = json_properties_from_headers(&headers).unwrap();
276        assert!(matches!(
277            config.timestamp_handling,
278            Some(TimestampHandling::Milli)
279        ));
280        assert!(matches!(
281            config.timestamptz_handling,
282            Some(TimestamptzHandling::Micro)
283        ));
284        assert!(matches!(config.time_handling, Some(TimeHandling::Milli)));
285        assert!(matches!(
286            config.bigint_unsigned_handling,
287            Some(BigintUnsignedHandlingMode::Precise)
288        ));
289        assert!(config.handle_toast_columns);
290    }
291
292    #[test]
293    fn test_parse_plain_json_config_rejects_invalid_supported_modes() {
294        for header in [
295            WEBHOOK_JSON_TIMESTAMP_HANDLING_HEADER,
296            WEBHOOK_JSON_TIMESTAMPTZ_HANDLING_HEADER,
297            WEBHOOK_JSON_TIME_HANDLING_HEADER,
298            WEBHOOK_JSON_BIGINT_UNSIGNED_HANDLING_HEADER,
299            WEBHOOK_JSON_HANDLE_TOAST_COLUMNS_HEADER,
300        ] {
301            let mut headers = HeaderMap::new();
302            headers.insert(header, HeaderValue::from_static("invalid"));
303            let err = json_properties_from_headers(&headers).unwrap_err();
304            assert_eq!(err.code(), StatusCode::BAD_REQUEST, "{header}");
305        }
306    }
307
308    #[test]
309    fn test_decode_plain_json_payload() {
310        let columns = test_columns(&[
311            ("id", DataType::Int32, true),
312            ("name", DataType::Varchar, false),
313        ]);
314        let row = decode_payload_row(&columns, br#"{"id":1,"name":"alice"}"#).unwrap();
315
316        assert_eq!(row.datum_at(0).to_owned_datum(), Some(ScalarImpl::Int32(1)));
317        assert_eq!(
318            row.datum_at(1).to_owned_datum(),
319            Some(ScalarImpl::Utf8("alice".into()))
320        );
321    }
322
323    #[test]
324    fn test_decode_plain_json_payload_rejects_missing_pk() {
325        let columns = test_columns(&[
326            ("id", DataType::Int32, true),
327            ("name", DataType::Varchar, false),
328        ]);
329        let err = decode_payload_row(&columns, br#"{"name":"alice"}"#).unwrap_err();
330        assert_eq!(err.code(), StatusCode::UNPROCESSABLE_ENTITY);
331    }
332
333    #[test]
334    fn test_decode_single_json_object_payload() {
335        let columns = test_columns(&[
336            ("id", DataType::Int32, true),
337            ("price", DataType::Decimal, false),
338            ("created_at", DataType::Timestamp, false),
339            ("name", DataType::Varchar, false),
340        ]);
341        let row = decode_payload_row(
342            &columns,
343            br#"{"id":1,"price":"19.99","created_at":"2026-04-15 10:00:00","name":"alice"}"#,
344        )
345        .unwrap();
346
347        assert_eq!(row.datum_at(0).to_owned_datum(), Some(ScalarImpl::Int32(1)));
348        assert!(matches!(
349            row.datum_at(1).to_owned_datum(),
350            Some(ScalarImpl::Decimal(_))
351        ));
352        assert!(matches!(
353            row.datum_at(2).to_owned_datum(),
354            Some(ScalarImpl::Timestamp(_))
355        ));
356        assert_eq!(
357            row.datum_at(3).to_owned_datum(),
358            Some(ScalarImpl::Utf8("alice".into()))
359        );
360    }
361
362    #[test]
363    fn test_decode_single_json_object_payload_rejects_missing_pk() {
364        let columns = test_columns(&[
365            ("id", DataType::Int32, true),
366            ("name", DataType::Varchar, false),
367        ]);
368        let err = decode_payload_row(&columns, br#"{"name":"alice"}"#).unwrap_err();
369        assert_eq!(err.code(), StatusCode::UNPROCESSABLE_ENTITY);
370    }
371}