risingwave_frontend/handler/
kill_process.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::error::tonic::TonicStatusWrapper;
17use risingwave_pb::frontend_service::CancelRunningSqlRequest;
18
19use crate::error::{ErrorCode, Result};
20use crate::handler::{HandlerArgs, RwPgResponse};
21use crate::session::{SessionMapRef, WorkerProcessId, cancel_queries_in_session};
22
23pub(super) async fn handle_kill(handler_args: HandlerArgs, s: String) -> Result<RwPgResponse> {
24    let worker_process_id =
25        WorkerProcessId::try_from(s).map_err(ErrorCode::InvalidParameterValue)?;
26    let env = handler_args.session.env();
27    let this_worker_id = env.meta_client_ref().worker_id();
28    if this_worker_id == worker_process_id.worker_id {
29        return handle_kill_local(
30            handler_args.session.env().sessions_map().clone(),
31            worker_process_id.process_id,
32        )
33        .await;
34    }
35    let Some(worker) = handler_args
36        .session
37        .env()
38        .worker_node_manager_ref()
39        .worker_node(worker_process_id.worker_id)
40    else {
41        return Err(ErrorCode::InvalidParameterValue(format!(
42            "worker {} not found",
43            worker_process_id.worker_id
44        ))
45        .into());
46    };
47    let frontend_client = env.frontend_client_pool().get(&worker).await?;
48    frontend_client
49        .cancel_running_sql(CancelRunningSqlRequest {
50            process_id: worker_process_id.process_id,
51        })
52        .await
53        .map_err(TonicStatusWrapper::from)?;
54    Ok(PgResponse::empty_result(StatementType::KILL))
55}
56
57pub async fn handle_kill_local(
58    sessions_map: SessionMapRef,
59    process_id: i32,
60) -> Result<RwPgResponse> {
61    // Process id and secret key in session id are the same in RisingWave.
62    let session_id = (process_id, process_id);
63    tracing::trace!("kill query in session: {:?}", session_id);
64    // TODO: cancel queries with await.
65    let session_exists = cancel_queries_in_session(session_id, sessions_map.clone());
66    let session = { sessions_map.read().get(&session_id).cloned() };
67    let Some(session) = session else {
68        if session_exists {
69            return Ok(PgResponse::empty_result(StatementType::KILL));
70        }
71        return Err(ErrorCode::SessionNotFound.into());
72    };
73    session
74        .env()
75        .creating_streaming_job_tracker()
76        .cancel_jobs(session_id)
77        .await
78        .map_err(crate::error::RwError::from)?;
79    Ok(PgResponse::empty_result(StatementType::KILL))
80}