risingwave_frontend/handler/
cancel_job.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::StatementType;
16use risingwave_common::types::Fields;
17use risingwave_pb::meta::cancel_creating_jobs_request::{CreatingJobIds, PbJobs};
18use risingwave_sqlparser::ast::JobIdents;
19
20use super::RwPgResponseBuilderExt;
21use super::util::execute_with_long_running_notification;
22use crate::error::Result;
23use crate::handler::{HandlerArgs, RwPgResponse};
24
25pub(super) async fn handle_cancel(
26    handler_args: HandlerArgs,
27    jobs: JobIdents,
28) -> Result<RwPgResponse> {
29    let session = handler_args.session;
30    let job_ids = jobs.0;
31    let mut filtered_job_ids = vec![];
32    let mut notices = vec![];
33    {
34        let catalog_reader = session.env().catalog_reader().read_guard();
35        for job_id in job_ids {
36            let database_catalog = catalog_reader.get_database_by_name(&session.database())?;
37            let sink_catalog = database_catalog
38                .iter_schemas()
39                .find_map(|schema| schema.get_sink_by_id(&job_id));
40            if let Some(sink_catalog) = sink_catalog {
41                if sink_catalog.is_created() {
42                    continue; // Skip already created sinks
43                } else if sink_catalog.target_table.is_some() {
44                    notices.push(format!(
45                        "Please use `DROP SINK {}` to cancel sink into table job.",
46                        sink_catalog.name
47                    ));
48                    continue;
49                }
50            }
51            filtered_job_ids.push(job_id.into());
52        }
53    }
54
55    let mut response_builder = RwPgResponse::builder(StatementType::CANCEL_COMMAND);
56    for notice in notices {
57        response_builder = response_builder.notice(notice);
58    }
59
60    let canceled_jobs = if !filtered_job_ids.is_empty() {
61        // Wrap in async block to convert RpcError to RwError
62        execute_with_long_running_notification(
63            async {
64                session
65                    .env()
66                    .meta_client()
67                    .cancel_creating_jobs(PbJobs::Ids(CreatingJobIds {
68                        job_ids: filtered_job_ids,
69                    }))
70                    .await
71                    .map_err(Into::into)
72            },
73            &session,
74            "CANCEL JOBS",
75        )
76        .await?
77    } else {
78        vec![]
79    };
80    let rows = canceled_jobs
81        .into_iter()
82        .map(|id| CancelRow { id: id.to_string() });
83    Ok(response_builder.rows(rows).into())
84}
85
86#[derive(Fields)]
87#[fields(style = "Title Case")]
88struct CancelRow {
89    id: String,
90}