risingwave_frontend/handler/
cancel_job.rs1use itertools::Itertools;
16use pgwire::pg_response::{PgResponse, StatementType};
17use risingwave_common::types::Fields;
18use risingwave_pb::id::JobId;
19use risingwave_pb::meta::cancel_creating_jobs_request::{CreatingJobIds, PbJobs};
20use risingwave_sqlparser::ast::JobIdents;
21
22use super::RwPgResponseBuilderExt;
23use super::util::execute_with_long_running_notification;
24use crate::error::Result;
25use crate::handler::{HandlerArgs, RwPgResponse};
26
27pub(super) async fn handle_cancel(
28 handler_args: HandlerArgs,
29 jobs: JobIdents,
30) -> Result<RwPgResponse> {
31 let session = handler_args.session;
32 let job_ids = jobs.0.into_iter().map(JobId::from).collect_vec();
33
34 let canceled_jobs = if !job_ids.is_empty() {
35 execute_with_long_running_notification(
37 async {
38 session
39 .env()
40 .meta_client()
41 .cancel_creating_jobs(PbJobs::Ids(CreatingJobIds { job_ids }))
42 .await
43 .map_err(Into::into)
44 },
45 &session,
46 "CANCEL JOBS",
47 )
48 .await?
49 } else {
50 vec![]
51 };
52 let rows = canceled_jobs
53 .into_iter()
54 .map(|id| CancelRow { id: id.to_string() });
55 Ok(PgResponse::builder(StatementType::CANCEL_COMMAND)
56 .rows(rows)
57 .into())
58}
59
60#[derive(Fields)]
61#[fields(style = "Title Case")]
62struct CancelRow {
63 id: String,
64}