risingwave_frontend/handler/
kill_process.rs1use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::error::tonic::TonicStatusWrapper;
17use risingwave_pb::frontend_service::CancelRunningSqlRequest;
18
19use crate::error::{ErrorCode, Result};
20use crate::handler::{HandlerArgs, RwPgResponse};
21use crate::session::{
22 SessionMapRef, WorkerProcessId, cancel_creating_jobs_in_session, cancel_queries_in_session,
23};
24
25pub(super) async fn handle_kill(handler_args: HandlerArgs, s: String) -> Result<RwPgResponse> {
26 let worker_process_id =
27 WorkerProcessId::try_from(s).map_err(ErrorCode::InvalidParameterValue)?;
28 let env = handler_args.session.env();
29 let this_worker_id = env.meta_client_ref().worker_id();
30 if this_worker_id == worker_process_id.worker_id {
31 return handle_kill_local(
32 handler_args.session.env().sessions_map().clone(),
33 worker_process_id.process_id,
34 )
35 .await;
36 }
37 let Some(worker) = handler_args
38 .session
39 .env()
40 .worker_node_manager_ref()
41 .worker_node(worker_process_id.worker_id)
42 else {
43 return Err(ErrorCode::InvalidParameterValue(format!(
44 "worker {} not found",
45 worker_process_id.worker_id
46 ))
47 .into());
48 };
49 let frontend_client = env.frontend_client_pool().get(&worker).await?;
50 frontend_client
51 .cancel_running_sql(CancelRunningSqlRequest {
52 process_id: worker_process_id.process_id,
53 })
54 .await
55 .map_err(TonicStatusWrapper::from)?;
56 Ok(PgResponse::empty_result(StatementType::KILL))
57}
58
59pub async fn handle_kill_local(
60 sessions_map: SessionMapRef,
61 process_id: i32,
62) -> Result<RwPgResponse> {
63 let session_id = (process_id, process_id);
65 tracing::trace!("kill query in session: {:?}", session_id);
66 let mut session_exists = cancel_queries_in_session(session_id, sessions_map.clone());
68 session_exists |= cancel_creating_jobs_in_session(session_id, sessions_map);
69
70 if session_exists {
71 Ok(PgResponse::empty_result(StatementType::KILL))
72 } else {
73 Err(ErrorCode::SessionNotFound.into())
74 }
75}