risingwave_frontend/handler/
cancel_job.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use pgwire::pg_response::{PgResponse, StatementType};
17use risingwave_common::types::Fields;
18use risingwave_pb::id::JobId;
19use risingwave_pb::meta::cancel_creating_jobs_request::{CreatingJobIds, PbJobs};
20use risingwave_sqlparser::ast::JobIdents;
21
22use super::RwPgResponseBuilderExt;
23use super::util::execute_with_long_running_notification;
24use crate::error::Result;
25use crate::handler::{HandlerArgs, RwPgResponse};
26
27pub(super) async fn handle_cancel(
28    handler_args: HandlerArgs,
29    jobs: JobIdents,
30) -> Result<RwPgResponse> {
31    let session = handler_args.session;
32    let job_ids = jobs.0.into_iter().map(JobId::from).collect_vec();
33
34    let canceled_jobs = if !job_ids.is_empty() {
35        // Wrap in async block to convert RpcError to RwError
36        execute_with_long_running_notification(
37            async {
38                session
39                    .env()
40                    .meta_client()
41                    .cancel_creating_jobs(PbJobs::Ids(CreatingJobIds { job_ids }))
42                    .await
43                    .map_err(Into::into)
44            },
45            &session,
46            "CANCEL JOBS",
47        )
48        .await?
49    } else {
50        vec![]
51    };
52    let rows = canceled_jobs
53        .into_iter()
54        .map(|id| CancelRow { id: id.to_string() });
55    Ok(PgResponse::builder(StatementType::CANCEL_COMMAND)
56        .rows(rows)
57        .into())
58}
59
60#[derive(Fields)]
61#[fields(style = "Title Case")]
62struct CancelRow {
63    id: String,
64}