risingwave_frontend/handler/
kill_process.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::error::tonic::TonicStatusWrapper;
17use risingwave_pb::frontend_service::CancelRunningSqlRequest;
18
19use crate::error::{ErrorCode, Result};
20use crate::handler::{HandlerArgs, RwPgResponse};
21use crate::session::{
22    SessionMapRef, WorkerProcessId, cancel_creating_jobs_in_session, cancel_queries_in_session,
23};
24
25pub(super) async fn handle_kill(handler_args: HandlerArgs, s: String) -> Result<RwPgResponse> {
26    let worker_process_id =
27        WorkerProcessId::try_from(s).map_err(ErrorCode::InvalidParameterValue)?;
28    let env = handler_args.session.env();
29    let this_worker_id = env.meta_client_ref().worker_id();
30    if this_worker_id == worker_process_id.worker_id {
31        return handle_kill_local(
32            handler_args.session.env().sessions_map().clone(),
33            worker_process_id.process_id,
34        )
35        .await;
36    }
37    let Some(worker) = handler_args
38        .session
39        .env()
40        .worker_node_manager_ref()
41        .worker_node(worker_process_id.worker_id)
42    else {
43        return Err(ErrorCode::InvalidParameterValue(format!(
44            "worker {} not found",
45            worker_process_id.worker_id
46        ))
47        .into());
48    };
49    let frontend_client = env.frontend_client_pool().get(&worker).await?;
50    frontend_client
51        .cancel_running_sql(CancelRunningSqlRequest {
52            process_id: worker_process_id.process_id,
53        })
54        .await
55        .map_err(TonicStatusWrapper::from)?;
56    Ok(PgResponse::empty_result(StatementType::KILL))
57}
58
59pub async fn handle_kill_local(
60    sessions_map: SessionMapRef,
61    process_id: i32,
62) -> Result<RwPgResponse> {
63    // Process id and secret key in session id are the same in RisingWave.
64    let session_id = (process_id, process_id);
65    tracing::trace!("kill query in session: {:?}", session_id);
66    // TODO: cancel queries with await.
67    let mut session_exists = cancel_queries_in_session(session_id, sessions_map.clone());
68    session_exists |= cancel_creating_jobs_in_session(session_id, sessions_map);
69
70    if session_exists {
71        Ok(PgResponse::empty_result(StatementType::KILL))
72    } else {
73        Err(ErrorCode::SessionNotFound.into())
74    }
75}