risingwave_frontend/handler/
kill_process.rs1use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::error::tonic::TonicStatusWrapper;
17use risingwave_pb::frontend_service::CancelRunningSqlRequest;
18
19use crate::error::{ErrorCode, Result};
20use crate::handler::{HandlerArgs, RwPgResponse};
21use crate::session::{SessionMapRef, WorkerProcessId, cancel_queries_in_session};
22
23pub(super) async fn handle_kill(handler_args: HandlerArgs, s: String) -> Result<RwPgResponse> {
24 let worker_process_id =
25 WorkerProcessId::try_from(s).map_err(ErrorCode::InvalidParameterValue)?;
26 let env = handler_args.session.env();
27 let this_worker_id = env.meta_client_ref().worker_id();
28 if this_worker_id == worker_process_id.worker_id {
29 return handle_kill_local(
30 handler_args.session.env().sessions_map().clone(),
31 worker_process_id.process_id,
32 )
33 .await;
34 }
35 let Some(worker) = handler_args
36 .session
37 .env()
38 .worker_node_manager_ref()
39 .worker_node(worker_process_id.worker_id)
40 else {
41 return Err(ErrorCode::InvalidParameterValue(format!(
42 "worker {} not found",
43 worker_process_id.worker_id
44 ))
45 .into());
46 };
47 let frontend_client = env.frontend_client_pool().get(&worker).await?;
48 frontend_client
49 .cancel_running_sql(CancelRunningSqlRequest {
50 process_id: worker_process_id.process_id,
51 })
52 .await
53 .map_err(TonicStatusWrapper::from)?;
54 Ok(PgResponse::empty_result(StatementType::KILL))
55}
56
57pub async fn handle_kill_local(
58 sessions_map: SessionMapRef,
59 process_id: i32,
60) -> Result<RwPgResponse> {
61 let session_id = (process_id, process_id);
63 tracing::trace!("kill query in session: {:?}", session_id);
64 let session_exists = cancel_queries_in_session(session_id, sessions_map.clone());
66 let session = { sessions_map.read().get(&session_id).cloned() };
67 let Some(session) = session else {
68 if session_exists {
69 return Ok(PgResponse::empty_result(StatementType::KILL));
70 }
71 return Err(ErrorCode::SessionNotFound.into());
72 };
73 session
74 .env()
75 .creating_streaming_job_tracker()
76 .cancel_jobs(session_id)
77 .await
78 .map_err(crate::error::RwError::from)?;
79 Ok(PgResponse::empty_result(StatementType::KILL))
80}