1use anyhow::anyhow;
16use axum::http::{HeaderMap, StatusCode};
17use risingwave_common::row::OwnedRow;
18use risingwave_common::types::ToOwnedDatum;
19use risingwave_connector::parser::{
20 Access, AccessResult, BigintUnsignedHandlingMode, JsonAccessBuilder, JsonProperties,
21 TimeHandling, TimestampHandling, TimestamptzHandling,
22};
23
24use super::WebhookTableColumnDesc;
25use super::utils::{Result, err};
26
27const WEBHOOK_FORMAT_HEADER: &str = "x-rw-webhook-format";
28const WEBHOOK_ENCODE_HEADER: &str = "x-rw-webhook-encode";
29const WEBHOOK_JSON_TIMESTAMP_HANDLING_HEADER: &str = "x-rw-webhook-json-timestamp-handling-mode";
30const WEBHOOK_JSON_TIMESTAMPTZ_HANDLING_HEADER: &str =
31 "x-rw-webhook-json-timestamptz-handling-mode";
32const WEBHOOK_JSON_TIME_HANDLING_HEADER: &str = "x-rw-webhook-json-time-handling-mode";
33const WEBHOOK_JSON_BIGINT_UNSIGNED_HANDLING_HEADER: &str =
34 "x-rw-webhook-json-bigint-unsigned-handling-mode";
35const WEBHOOK_JSON_HANDLE_TOAST_COLUMNS_HEADER: &str = "x-rw-webhook-json-handle-toast-columns";
36
37pub(crate) fn build_json_access_builder(headers: &HeaderMap) -> Result<JsonAccessBuilder> {
38 JsonAccessBuilder::new(json_properties_from_headers(headers)?).map_err(|e| {
39 err(
40 anyhow!(e).context("failed to build webhook JSON decoder"),
41 StatusCode::INTERNAL_SERVER_ERROR,
42 )
43 })
44}
45
46pub(crate) fn owned_row_from_payload_row(
47 access_builder: &mut JsonAccessBuilder,
48 columns: &[WebhookTableColumnDesc],
49 payload_row: &[u8],
50) -> Result<OwnedRow> {
51 let access = access_builder
52 .generate_json_access(payload_row.to_vec())
53 .map_err(|e| {
54 err(
55 anyhow!(e).context("failed to decode webhook JSON payload"),
56 StatusCode::UNPROCESSABLE_ENTITY,
57 )
58 })?;
59 let row = columns
60 .iter()
61 .map(
62 |column| match access.access(&[column.name.as_str()], &column.data_type) {
63 Ok(datum) => Ok(datum.to_owned_datum()),
64 Err(error) if column.is_pk => Err(error),
65 Err(_) => Ok(None),
66 },
67 )
68 .collect::<AccessResult<Vec<_>>>()
69 .map(OwnedRow::new);
70 row.map_err(|e| {
71 err(
72 anyhow!(e).context("failed to decode webhook JSON payload"),
73 StatusCode::UNPROCESSABLE_ENTITY,
74 )
75 })
76}
77
78fn json_properties_from_headers(headers: &HeaderMap) -> Result<JsonProperties> {
79 let format = header_value(headers, WEBHOOK_FORMAT_HEADER)?.unwrap_or_else(|| "plain".into());
80 if !format.eq_ignore_ascii_case("plain") {
81 return Err(err(
82 anyhow!("unsupported webhook payload format `{format}`"),
83 StatusCode::BAD_REQUEST,
84 ));
85 }
86
87 let encode = header_value(headers, WEBHOOK_ENCODE_HEADER)?.unwrap_or_else(|| "json".into());
88 if !encode.eq_ignore_ascii_case("json") {
89 return Err(err(
90 anyhow!("unsupported webhook payload encode `{encode}`"),
91 StatusCode::BAD_REQUEST,
92 ));
93 }
94
95 let timestamp_handling =
96 parse_timestamp_handling(headers, WEBHOOK_JSON_TIMESTAMP_HANDLING_HEADER)?;
97 let timestamptz_handling =
98 parse_timestamptz_handling(headers, WEBHOOK_JSON_TIMESTAMPTZ_HANDLING_HEADER)?;
99 let time_handling = parse_time_handling(headers, WEBHOOK_JSON_TIME_HANDLING_HEADER)?;
100 let bigint_unsigned_handling =
101 parse_bigint_unsigned_handling(headers, WEBHOOK_JSON_BIGINT_UNSIGNED_HANDLING_HEADER)?;
102 let handle_toast_columns =
103 parse_bool_header(headers, WEBHOOK_JSON_HANDLE_TOAST_COLUMNS_HEADER)?.unwrap_or(false);
104
105 Ok(JsonProperties {
106 use_schema_registry: false,
107 timestamp_handling,
108 timestamptz_handling,
109 time_handling,
110 bigint_unsigned_handling,
111 handle_toast_columns,
112 })
113}
114
115fn parse_timestamp_handling(
116 headers: &HeaderMap,
117 key: &'static str,
118) -> Result<Option<TimestampHandling>> {
119 match header_value(headers, key)?.as_deref() {
120 Some("milli") => Ok(Some(TimestampHandling::Milli)),
121 Some("guess_number_unit") => Ok(Some(TimestampHandling::GuessNumberUnit)),
122 Some(value) => Err(err(
123 anyhow!("unrecognized `{key}` value `{value}`"),
124 StatusCode::BAD_REQUEST,
125 )),
126 None => Ok(None),
127 }
128}
129
130fn parse_timestamptz_handling(
131 headers: &HeaderMap,
132 key: &'static str,
133) -> Result<Option<TimestamptzHandling>> {
134 header_value(headers, key)?
135 .as_deref()
136 .map(TimestamptzHandling::from_options)
137 .transpose()
138 .map_err(|e| {
139 err(
140 anyhow!(e).context("invalid webhook JSON decoder option"),
141 StatusCode::BAD_REQUEST,
142 )
143 })
144}
145
146fn parse_time_handling(headers: &HeaderMap, key: &'static str) -> Result<Option<TimeHandling>> {
147 match header_value(headers, key)?.as_deref() {
148 Some("milli") => Ok(Some(TimeHandling::Milli)),
149 Some("micro") => Ok(Some(TimeHandling::Micro)),
150 Some(value) => Err(err(
151 anyhow!("unrecognized `{key}` value `{value}`"),
152 StatusCode::BAD_REQUEST,
153 )),
154 None => Ok(None),
155 }
156}
157
158fn parse_bigint_unsigned_handling(
159 headers: &HeaderMap,
160 key: &'static str,
161) -> Result<Option<BigintUnsignedHandlingMode>> {
162 match header_value(headers, key)?.as_deref() {
163 Some("long") => Ok(Some(BigintUnsignedHandlingMode::Long)),
164 Some("precise") => Ok(Some(BigintUnsignedHandlingMode::Precise)),
165 Some(value) => Err(err(
166 anyhow!("unrecognized `{key}` value `{value}`"),
167 StatusCode::BAD_REQUEST,
168 )),
169 None => Ok(None),
170 }
171}
172
173fn parse_bool_header(headers: &HeaderMap, key: &'static str) -> Result<Option<bool>> {
174 match header_value(headers, key)?.as_deref() {
175 Some("true") => Ok(Some(true)),
176 Some("false") => Ok(Some(false)),
177 Some(value) => Err(err(
178 anyhow!("unrecognized `{key}` value `{value}`"),
179 StatusCode::BAD_REQUEST,
180 )),
181 None => Ok(None),
182 }
183}
184
185fn header_value(headers: &HeaderMap, key: &'static str) -> Result<Option<String>> {
186 headers
187 .get(key)
188 .map(|value| {
189 value.to_str().map(|value| value.to_owned()).map_err(|e| {
190 err(
191 anyhow!(e).context(format!("invalid UTF-8 in `{key}` header")),
192 StatusCode::BAD_REQUEST,
193 )
194 })
195 })
196 .transpose()
197}
198
199#[cfg(test)]
200mod tests {
201 use axum::http::{HeaderMap, HeaderValue};
202 use risingwave_common::row::{OwnedRow, Row};
203 use risingwave_common::types::{DataType, ScalarImpl, ToOwnedDatum};
204
205 use super::*;
206
207 fn decode_payload_row(columns: &[WebhookTableColumnDesc], payload: &[u8]) -> Result<OwnedRow> {
208 let mut access_builder = build_json_access_builder(&HeaderMap::new())?;
209 owned_row_from_payload_row(&mut access_builder, columns, payload)
210 }
211
212 fn test_columns(columns: &[(&str, DataType, bool)]) -> Vec<WebhookTableColumnDesc> {
213 columns
214 .iter()
215 .map(|(name, data_type, is_pk)| WebhookTableColumnDesc {
216 name: (*name).to_owned(),
217 data_type: data_type.clone(),
218 is_pk: *is_pk,
219 })
220 .collect()
221 }
222
223 #[test]
224 fn test_parse_plain_json_config_defaults() {
225 let config = json_properties_from_headers(&HeaderMap::new()).unwrap();
226 assert!(config.timestamptz_handling.is_none());
227 }
228
229 #[test]
230 fn test_parse_plain_json_config_rejects_unsupported_encode() {
231 let mut headers = HeaderMap::new();
232 headers.insert(WEBHOOK_ENCODE_HEADER, HeaderValue::from_static("csv"));
233 let err = json_properties_from_headers(&headers).unwrap_err();
234 assert_eq!(err.code(), StatusCode::BAD_REQUEST);
235 }
236
237 #[test]
238 fn test_parse_plain_json_config_accepts_timestamptz_mode() {
239 let mut headers = HeaderMap::new();
240 headers.insert(
241 WEBHOOK_JSON_TIMESTAMPTZ_HANDLING_HEADER,
242 HeaderValue::from_static("milli"),
243 );
244 let config = json_properties_from_headers(&headers).unwrap();
245 assert!(matches!(
246 config.timestamptz_handling,
247 Some(TimestamptzHandling::Milli)
248 ));
249 }
250
251 #[test]
252 fn test_parse_plain_json_config_accepts_all_supported_modes() {
253 let mut headers = HeaderMap::new();
254 headers.insert(
255 WEBHOOK_JSON_TIMESTAMP_HANDLING_HEADER,
256 HeaderValue::from_static("milli"),
257 );
258 headers.insert(
259 WEBHOOK_JSON_TIMESTAMPTZ_HANDLING_HEADER,
260 HeaderValue::from_static("micro"),
261 );
262 headers.insert(
263 WEBHOOK_JSON_TIME_HANDLING_HEADER,
264 HeaderValue::from_static("milli"),
265 );
266 headers.insert(
267 WEBHOOK_JSON_BIGINT_UNSIGNED_HANDLING_HEADER,
268 HeaderValue::from_static("precise"),
269 );
270 headers.insert(
271 WEBHOOK_JSON_HANDLE_TOAST_COLUMNS_HEADER,
272 HeaderValue::from_static("true"),
273 );
274
275 let config = json_properties_from_headers(&headers).unwrap();
276 assert!(matches!(
277 config.timestamp_handling,
278 Some(TimestampHandling::Milli)
279 ));
280 assert!(matches!(
281 config.timestamptz_handling,
282 Some(TimestamptzHandling::Micro)
283 ));
284 assert!(matches!(config.time_handling, Some(TimeHandling::Milli)));
285 assert!(matches!(
286 config.bigint_unsigned_handling,
287 Some(BigintUnsignedHandlingMode::Precise)
288 ));
289 assert!(config.handle_toast_columns);
290 }
291
292 #[test]
293 fn test_parse_plain_json_config_rejects_invalid_supported_modes() {
294 for header in [
295 WEBHOOK_JSON_TIMESTAMP_HANDLING_HEADER,
296 WEBHOOK_JSON_TIMESTAMPTZ_HANDLING_HEADER,
297 WEBHOOK_JSON_TIME_HANDLING_HEADER,
298 WEBHOOK_JSON_BIGINT_UNSIGNED_HANDLING_HEADER,
299 WEBHOOK_JSON_HANDLE_TOAST_COLUMNS_HEADER,
300 ] {
301 let mut headers = HeaderMap::new();
302 headers.insert(header, HeaderValue::from_static("invalid"));
303 let err = json_properties_from_headers(&headers).unwrap_err();
304 assert_eq!(err.code(), StatusCode::BAD_REQUEST, "{header}");
305 }
306 }
307
308 #[test]
309 fn test_decode_plain_json_payload() {
310 let columns = test_columns(&[
311 ("id", DataType::Int32, true),
312 ("name", DataType::Varchar, false),
313 ]);
314 let row = decode_payload_row(&columns, br#"{"id":1,"name":"alice"}"#).unwrap();
315
316 assert_eq!(row.datum_at(0).to_owned_datum(), Some(ScalarImpl::Int32(1)));
317 assert_eq!(
318 row.datum_at(1).to_owned_datum(),
319 Some(ScalarImpl::Utf8("alice".into()))
320 );
321 }
322
323 #[test]
324 fn test_decode_plain_json_payload_rejects_missing_pk() {
325 let columns = test_columns(&[
326 ("id", DataType::Int32, true),
327 ("name", DataType::Varchar, false),
328 ]);
329 let err = decode_payload_row(&columns, br#"{"name":"alice"}"#).unwrap_err();
330 assert_eq!(err.code(), StatusCode::UNPROCESSABLE_ENTITY);
331 }
332
333 #[test]
334 fn test_decode_single_json_object_payload() {
335 let columns = test_columns(&[
336 ("id", DataType::Int32, true),
337 ("price", DataType::Decimal, false),
338 ("created_at", DataType::Timestamp, false),
339 ("name", DataType::Varchar, false),
340 ]);
341 let row = decode_payload_row(
342 &columns,
343 br#"{"id":1,"price":"19.99","created_at":"2026-04-15 10:00:00","name":"alice"}"#,
344 )
345 .unwrap();
346
347 assert_eq!(row.datum_at(0).to_owned_datum(), Some(ScalarImpl::Int32(1)));
348 assert!(matches!(
349 row.datum_at(1).to_owned_datum(),
350 Some(ScalarImpl::Decimal(_))
351 ));
352 assert!(matches!(
353 row.datum_at(2).to_owned_datum(),
354 Some(ScalarImpl::Timestamp(_))
355 ));
356 assert_eq!(
357 row.datum_at(3).to_owned_datum(),
358 Some(ScalarImpl::Utf8("alice".into()))
359 );
360 }
361
362 #[test]
363 fn test_decode_single_json_object_payload_rejects_missing_pk() {
364 let columns = test_columns(&[
365 ("id", DataType::Int32, true),
366 ("name", DataType::Varchar, false),
367 ]);
368 let err = decode_payload_row(&columns, br#"{"name":"alice"}"#).unwrap_err();
369 assert_eq!(err.code(), StatusCode::UNPROCESSABLE_ENTITY);
370 }
371}