risingwave_frontend/webhook/
utils.rs1use std::collections::HashMap;
16
17use anyhow::anyhow;
18use axum::Json;
19use axum::http::{HeaderMap, StatusCode};
20use axum::response::IntoResponse;
21use risingwave_common::row::OwnedRow;
22use risingwave_common::types::JsonbVal;
23use risingwave_pb::expr::ExprNode;
24use serde_json::json;
25use thiserror_ext::AsReport;
26
27use crate::expr::ExprImpl;
28
29pub struct WebhookError {
30 err: anyhow::Error,
31 code: StatusCode,
32}
33
34pub(crate) type Result<T> = std::result::Result<T, WebhookError>;
35
36pub(crate) fn err(err: impl Into<anyhow::Error>, code: StatusCode) -> WebhookError {
37 WebhookError {
38 err: err.into(),
39 code,
40 }
41}
42
43impl From<anyhow::Error> for WebhookError {
44 fn from(value: anyhow::Error) -> Self {
45 WebhookError {
46 err: value,
47 code: StatusCode::INTERNAL_SERVER_ERROR,
48 }
49 }
50}
51
52impl IntoResponse for WebhookError {
53 fn into_response(self) -> axum::response::Response {
54 let mut resp = Json(json!({
55 "error": self.err.to_report_string(),
56 }))
57 .into_response();
58 *resp.status_mut() = self.code;
59 resp
60 }
61}
62
63pub(crate) fn header_map_to_json(headers: &HeaderMap) -> JsonbVal {
64 let mut header_map = HashMap::new();
65
66 for (key, value) in headers {
67 let key = key.as_str().to_owned();
68 let value = value.to_str().unwrap_or("").to_owned();
69 header_map.insert(key, value);
70 }
71
72 let json_value = json!(header_map);
73 JsonbVal::from(json_value)
74}
75
76pub(crate) async fn verify_signature(
77 headers_jsonb: JsonbVal,
78 secret: &str,
79 payload: &[u8],
80 signature_expr: ExprNode,
81) -> Result<bool> {
82 let row = OwnedRow::new(vec![
83 Some(headers_jsonb.into()),
84 Some(secret.into()),
85 Some(payload.into()),
86 ]);
87
88 let signature_expr_impl = ExprImpl::from_expr_proto(&signature_expr)
89 .map_err(|e| err(e, StatusCode::INTERNAL_SERVER_ERROR))?;
90
91 let result = signature_expr_impl
92 .eval_row(&row)
93 .await
94 .map_err(|e| {
95 tracing::error!(error = %e.as_report(), "Fail to validate for webhook events.");
96 err(e, StatusCode::INTERNAL_SERVER_ERROR)
97 })?
98 .ok_or_else(|| {
99 err(
100 anyhow!("`SECURE_COMPARE()` failed"),
101 StatusCode::BAD_REQUEST,
102 )
103 })?;
104 Ok(*result.as_bool())
105}