risingwave_frontend/webhook/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::net::SocketAddr;
16use std::sync::Arc;
17use std::sync::atomic::AtomicU32;
18
19use anyhow::{Context, anyhow};
20use axum::Router;
21use axum::body::Bytes;
22use axum::extract::{Extension, Path};
23use axum::http::{HeaderMap, Method, StatusCode};
24use axum::routing::post;
25use risingwave_common::array::{Array, ArrayBuilder, DataChunk};
26use risingwave_common::secret::LocalSecretManager;
27use risingwave_common::types::{DataType, JsonbVal, Scalar};
28use risingwave_pb::catalog::WebhookSourceInfo;
29use risingwave_pb::task_service::{FastInsertRequest, FastInsertResponse};
30use tokio::net::TcpListener;
31use tower::ServiceBuilder;
32use tower_http::add_extension::AddExtensionLayer;
33use tower_http::compression::CompressionLayer;
34use tower_http::cors::{self, CorsLayer};
35
36use crate::webhook::utils::{Result, err};
37mod utils;
38use risingwave_rpc_client::ComputeClient;
39
40pub type Service = Arc<WebhookService>;
41
42// We always use the `root` user to connect to the database to allow the webhook service to access all tables.
43const USER: &str = "root";
44
45#[derive(Clone)]
46pub struct FastInsertContext {
47    pub webhook_source_info: WebhookSourceInfo,
48    pub fast_insert_request: FastInsertRequest,
49    pub compute_client: ComputeClient,
50}
51
52pub struct WebhookService {
53    webhook_addr: SocketAddr,
54    counter: AtomicU32,
55}
56
57pub(super) mod handlers {
58    use jsonbb::Value;
59    use risingwave_common::array::JsonbArrayBuilder;
60    use risingwave_common::session_config::SearchPath;
61    use risingwave_pb::catalog::WebhookSourceInfo;
62    use risingwave_pb::task_service::fast_insert_response;
63    use utils::{header_map_to_json, verify_signature};
64
65    use super::*;
66    use crate::catalog::root_catalog::SchemaPath;
67    use crate::scheduler::choose_fast_insert_client;
68    use crate::session::SESSION_MANAGER;
69
70    pub async fn handle_post_request(
71        Extension(srv): Extension<Service>,
72        headers: HeaderMap,
73        Path((database, schema, table)): Path<(String, String, String)>,
74        body: Bytes,
75    ) -> Result<()> {
76        let request_id = srv
77            .counter
78            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
79        let FastInsertContext {
80            webhook_source_info,
81            mut fast_insert_request,
82            compute_client,
83        } = acquire_table_info(request_id, &database, &schema, &table).await?;
84
85        let WebhookSourceInfo {
86            signature_expr,
87            secret_ref,
88            wait_for_persistence: _,
89        } = webhook_source_info;
90
91        let secret_string = if let Some(secret_ref) = secret_ref {
92            LocalSecretManager::global()
93                .fill_secret(secret_ref)
94                .map_err(|e| err(e, StatusCode::NOT_FOUND))?
95        } else {
96            String::new()
97        };
98
99        // Once limitation here is that the key is no longer case-insensitive, users must user the lowercase key when defining the webhook source table.
100        let headers_jsonb = header_map_to_json(&headers);
101
102        // verify the signature
103        let is_valid = verify_signature(
104            headers_jsonb,
105            secret_string.as_str(),
106            body.as_ref(),
107            signature_expr.unwrap(),
108        )
109        .await?;
110
111        if !is_valid {
112            return Err(err(
113                anyhow!("Signature verification failed"),
114                StatusCode::UNAUTHORIZED,
115            ));
116        }
117
118        // Use builder to obtain a single column & single row DataChunk
119        let mut builder = JsonbArrayBuilder::with_type(1, DataType::Jsonb);
120        let json_value = Value::from_text(&body).map_err(|e| {
121            err(
122                anyhow!(e).context("Failed to parse body"),
123                StatusCode::UNPROCESSABLE_ENTITY,
124            )
125        })?;
126        let jsonb_val = JsonbVal::from(json_value);
127        builder.append(Some(jsonb_val.as_scalar_ref()));
128        let data_chunk = DataChunk::new(vec![builder.finish().into_ref()], 1);
129
130        // fill the data_chunk
131        fast_insert_request.data_chunk = Some(data_chunk.to_protobuf());
132        // execute on the compute node
133        let res = execute(fast_insert_request, compute_client).await?;
134
135        if res.status == fast_insert_response::Status::Succeeded as i32 {
136            Ok(())
137        } else {
138            Err(err(
139                anyhow!("Failed to fast insert: {}", res.error_message),
140                StatusCode::INTERNAL_SERVER_ERROR,
141            ))
142        }
143    }
144
145    async fn acquire_table_info(
146        request_id: u32,
147        database: &String,
148        schema: &String,
149        table: &String,
150    ) -> Result<FastInsertContext> {
151        let session_mgr = SESSION_MANAGER
152            .get()
153            .expect("session manager has been initialized");
154
155        let frontend_env = session_mgr.env();
156
157        let search_path = SearchPath::default();
158        let schema_path = SchemaPath::new(Some(schema.as_str()), &search_path, USER);
159
160        let (webhook_source_info, table_id, version_id, row_id_index) = {
161            let reader = frontend_env.catalog_reader().read_guard();
162            let (table_catalog, _schema) = reader
163                .get_any_table_by_name(database.as_str(), schema_path, table)
164                .map_err(|e| err(e, StatusCode::NOT_FOUND))?;
165
166            let webhook_source_info = table_catalog
167                .webhook_info
168                .as_ref()
169                .ok_or_else(|| {
170                    err(
171                        anyhow!("Table `{}` is not with webhook source", table),
172                        StatusCode::FORBIDDEN,
173                    )
174                })?
175                .clone();
176            (
177                webhook_source_info,
178                table_catalog.id(),
179                table_catalog.version_id().expect("table must be versioned"),
180                table_catalog.row_id_index.map(|idx| idx as u32),
181            )
182        };
183
184        let fast_insert_request = FastInsertRequest {
185            table_id: table_id.table_id,
186            table_version_id: version_id,
187            column_indices: vec![0],
188            // leave the data_chunk empty for now
189            data_chunk: None,
190            row_id_index,
191            request_id,
192            wait_for_persistence: webhook_source_info.wait_for_persistence,
193        };
194
195        let compute_client = choose_fast_insert_client(&table_id, frontend_env, request_id)
196            .await
197            .unwrap();
198
199        Ok(FastInsertContext {
200            webhook_source_info,
201            fast_insert_request,
202            compute_client,
203        })
204    }
205
206    async fn execute(
207        request: FastInsertRequest,
208        client: ComputeClient,
209    ) -> Result<FastInsertResponse> {
210        let response = client.fast_insert(request).await.map_err(|e| {
211            err(
212                anyhow!(e).context("Failed to execute on compute node"),
213                StatusCode::INTERNAL_SERVER_ERROR,
214            )
215        })?;
216        Ok(response)
217    }
218}
219
220impl WebhookService {
221    pub fn new(webhook_addr: SocketAddr) -> Self {
222        Self {
223            webhook_addr,
224            counter: AtomicU32::new(0),
225        }
226    }
227
228    pub async fn serve(self) -> anyhow::Result<()> {
229        use handlers::*;
230        let srv = Arc::new(self);
231
232        let cors_layer = CorsLayer::new()
233            .allow_origin(cors::Any)
234            .allow_methods(vec![Method::POST]);
235
236        let api_router: Router = Router::new()
237            .route("/:database/:schema/:table", post(handle_post_request))
238            .layer(
239                ServiceBuilder::new()
240                    .layer(AddExtensionLayer::new(srv.clone()))
241                    .into_inner(),
242            )
243            .layer(cors_layer);
244
245        let app: Router = Router::new()
246            .nest("/webhook", api_router)
247            .layer(CompressionLayer::new());
248
249        let listener = TcpListener::bind(&srv.webhook_addr)
250            .await
251            .context("Failed to bind dashboard address")?;
252
253        #[cfg(not(madsim))]
254        axum::serve(listener, app)
255            .await
256            .context("Failed to serve dashboard service")?;
257
258        Ok(())
259    }
260}
261
262#[cfg(test)]
263mod tests {
264    use std::net::SocketAddr;
265
266    #[tokio::test]
267    #[ignore]
268    async fn test_webhook_server() -> anyhow::Result<()> {
269        let addr = SocketAddr::from(([127, 0, 0, 1], 4560));
270        let service = crate::webhook::WebhookService::new(addr);
271        service.serve().await?;
272        Ok(())
273    }
274}