risingwave_frontend/handler/
cancel_job.rs1use itertools::Itertools;
16use pgwire::pg_response::{PgResponse, StatementType};
17use risingwave_common::types::Fields;
18use risingwave_pb::id::JobId;
19use risingwave_pb::meta::cancel_creating_jobs_request::{CreatingJobIds, PbJobs};
20use risingwave_sqlparser::ast::JobIdents;
21
22use super::RwPgResponseBuilderExt;
23use super::util::{LongRunningNotificationAction, execute_with_long_running_notification};
24use crate::error::Result;
25use crate::handler::{HandlerArgs, RwPgResponse};
26
27pub(super) async fn handle_cancel(
28 handler_args: HandlerArgs,
29 jobs: JobIdents,
30) -> Result<RwPgResponse> {
31 let session = handler_args.session;
32 let job_ids = jobs.0.into_iter().map(JobId::from).collect_vec();
33
34 let canceled_jobs = if !job_ids.is_empty() {
35 execute_with_long_running_notification(
37 async {
38 session
39 .env()
40 .meta_client()
41 .cancel_creating_jobs(PbJobs::Ids(CreatingJobIds { job_ids }))
42 .await
43 .map_err(Into::into)
44 },
45 &session,
46 "CANCEL JOBS",
47 LongRunningNotificationAction::SuggestRecover,
48 )
49 .await?
50 } else {
51 vec![]
52 };
53 let rows = canceled_jobs
54 .into_iter()
55 .map(|id| CancelRow { id: id.to_string() });
56 Ok(PgResponse::builder(StatementType::CANCEL_COMMAND)
57 .rows(rows)
58 .into())
59}
60
61#[derive(Fields)]
62#[fields(style = "Title Case")]
63struct CancelRow {
64 id: String,
65}