risingwave_frontend/handler/
cancel_job.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::StatementType;
16use risingwave_common::types::Fields;
17use risingwave_pb::meta::cancel_creating_jobs_request::{CreatingJobIds, PbJobs};
18use risingwave_sqlparser::ast::JobIdents;
19
20use super::RwPgResponseBuilderExt;
21use crate::error::Result;
22use crate::handler::{HandlerArgs, RwPgResponse};
23
24pub(super) async fn handle_cancel(
25    handler_args: HandlerArgs,
26    jobs: JobIdents,
27) -> Result<RwPgResponse> {
28    let session = handler_args.session;
29    let job_ids = jobs.0;
30    let mut filtered_job_ids = vec![];
31    let mut notices = vec![];
32    {
33        let catalog_reader = session.env().catalog_reader().read_guard();
34        for job_id in job_ids {
35            let database_catalog = catalog_reader.get_database_by_name(&session.database())?;
36            let sink_catalog = database_catalog
37                .iter_schemas()
38                .find_map(|schema| schema.get_sink_by_id(&job_id));
39            if let Some(sink_catalog) = sink_catalog {
40                if sink_catalog.is_created() {
41                    continue; // Skip already created sinks
42                } else if sink_catalog.target_table.is_some() {
43                    notices.push(format!(
44                        "Please use `DROP SINK {}` to cancel sink into table job.",
45                        sink_catalog.name
46                    ));
47                    continue;
48                }
49            }
50            filtered_job_ids.push(job_id);
51        }
52    }
53
54    let mut response_builder = RwPgResponse::builder(StatementType::CANCEL_COMMAND);
55    for notice in notices {
56        response_builder = response_builder.notice(notice);
57    }
58
59    let canceled_jobs = if !filtered_job_ids.is_empty() {
60        session
61            .env()
62            .meta_client()
63            .cancel_creating_jobs(PbJobs::Ids(CreatingJobIds {
64                job_ids: filtered_job_ids,
65            }))
66            .await?
67    } else {
68        vec![]
69    };
70    let rows = canceled_jobs
71        .into_iter()
72        .map(|id| CancelRow { id: id.to_string() });
73    Ok(response_builder.rows(rows).into())
74}
75
76#[derive(Fields)]
77#[fields(style = "Title Case")]
78struct CancelRow {
79    id: String,
80}